Preparing for DataType feature update and merge with IPv6 update
This commit is contained in:
parent
13d32e8687
commit
71e0dcef47
3 changed files with 165 additions and 34 deletions
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "sslrelay"
|
name = "sslrelay"
|
||||||
version = "0.3.0"
|
version = "0.3.1"
|
||||||
authors = ["PinkP4nther <pinkp4nther@protonmail.com>"]
|
authors = ["PinkP4nther <pinkp4nther@protonmail.com>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
|
|
|
||||||
151
src/data.rs
151
src/data.rs
|
|
@ -1,5 +1,5 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode};
|
use openssl::ssl::{Ssl, SslConnector, SslMethod, SslStream, SslVerifyMode};
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use std::net::{TcpStream, Shutdown};
|
use std::net::{TcpStream, Shutdown};
|
||||||
use std::sync::mpsc::{self, Receiver, Sender};
|
use std::sync::mpsc::{self, Receiver, Sender};
|
||||||
|
|
@ -22,8 +22,37 @@ enum DataPipe {
|
||||||
Shutdown,
|
Shutdown,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct DownStreamInner {
|
/*
|
||||||
ds_stream: Option<Arc<Mutex<SslStream<TcpStream>>>>,
|
trait DataStream {}
|
||||||
|
impl<S: Read + Write> DataStream for S {}
|
||||||
|
*/
|
||||||
|
enum DataStreamType {
|
||||||
|
RAW(TcpStream),
|
||||||
|
TLS(SslStream<TcpStream>),
|
||||||
|
}
|
||||||
|
|
||||||
|
//impl Read for DataStreamType {}
|
||||||
|
//impl Write for DataStreamType {}
|
||||||
|
/*
|
||||||
|
impl DataStreamType {
|
||||||
|
fn shutdown(&self) {
|
||||||
|
match self {
|
||||||
|
&Self::RAW(r) => {
|
||||||
|
r.shutdown(Shutdown::Both);
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
&Self::TLS(t) => {
|
||||||
|
t.shutdown();
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
struct DownStreamInner
|
||||||
|
{
|
||||||
|
//ds_stream: Option<Arc<Mutex<SslStream<TcpStream>>>>,
|
||||||
|
ds_stream: Option<Arc<Mutex<DataStreamType>>>,
|
||||||
internal_data_buffer: Vec<u8>,
|
internal_data_buffer: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -31,11 +60,97 @@ impl DownStreamInner {
|
||||||
|
|
||||||
fn handle_error(&self, error_description: &str) {
|
fn handle_error(&self, error_description: &str) {
|
||||||
println!("[SSLRelay DownStream Thread Error]: {}", error_description);
|
println!("[SSLRelay DownStream Thread Error]: {}", error_description);
|
||||||
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
self.ds_stream.as_ref().unwrap().lock().unwrap().shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn ds_handler(&mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
|
pub fn ds_handler(&mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_raw(&mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
|
||||||
|
loop {
|
||||||
|
|
||||||
|
match data_in.recv_timeout(Duration::from_millis(50)) {
|
||||||
|
|
||||||
|
// DataPipe Received
|
||||||
|
Ok(data_received) => {
|
||||||
|
|
||||||
|
match data_received {
|
||||||
|
DataPipe::DataWrite(data) => {
|
||||||
|
|
||||||
|
let mut stream_lock = match self.ds_stream.as_ref().unwrap().lock() {
|
||||||
|
Ok(sl) => sl,
|
||||||
|
Err(_e) => {
|
||||||
|
self.handle_error("Failed to get stream lock!");
|
||||||
|
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) {
|
||||||
|
self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str());
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match stream_lock.write_all(&data) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(_e) => {
|
||||||
|
self.handle_error("Failed to write data to DownStream tcp stream!");
|
||||||
|
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) {
|
||||||
|
self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str());
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let _ = stream_lock.flush();
|
||||||
|
drop(stream_lock);
|
||||||
|
|
||||||
|
},
|
||||||
|
DataPipe::Shutdown => {
|
||||||
|
self.ds_stream.as_ref().unwrap().lock().unwrap().shutdown();
|
||||||
|
return;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_e) => {
|
||||||
|
match _e {
|
||||||
|
mpsc::RecvTimeoutError::Timeout => {},
|
||||||
|
mpsc::RecvTimeoutError::Disconnected => {
|
||||||
|
self.handle_error("DownStream data_in channel is disconnected!");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}// End of data_in receive
|
||||||
|
|
||||||
|
// If received data
|
||||||
|
if let Some(byte_count) = self.get_data_stream() {
|
||||||
|
if byte_count > 0 {
|
||||||
|
|
||||||
|
if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())) {
|
||||||
|
self.handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.internal_data_buffer.clear();
|
||||||
|
|
||||||
|
} else if byte_count == 0 || byte_count == -2 {
|
||||||
|
|
||||||
|
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) {
|
||||||
|
self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str());
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
|
||||||
|
} else if byte_count == -1 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_tls(&mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
||||||
match data_in.recv_timeout(Duration::from_millis(50)) {
|
match data_in.recv_timeout(Duration::from_millis(50)) {
|
||||||
|
|
@ -72,7 +187,7 @@ impl DownStreamInner {
|
||||||
|
|
||||||
},
|
},
|
||||||
DataPipe::Shutdown => {
|
DataPipe::Shutdown => {
|
||||||
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
self.ds_stream.as_ref().unwrap().lock().unwrap().shutdown();
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
@ -173,8 +288,9 @@ impl DownStreamInner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct UpStreamInner{
|
struct UpStreamInner
|
||||||
us_stream: Option<Arc<Mutex<SslStream<TcpStream>>>>,
|
{
|
||||||
|
us_stream: Option<Arc<Mutex<DataStreamType>>>,
|
||||||
internal_data_buffer: Vec<u8>
|
internal_data_buffer: Vec<u8>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -325,9 +441,11 @@ pub struct FullDuplexTcp<H>
|
||||||
where
|
where
|
||||||
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static,
|
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static,
|
||||||
{
|
{
|
||||||
ds_tcp_stream: Arc<Mutex<SslStream<TcpStream>>>,
|
ds_tcp_stream: Arc<Mutex<DataStreamType>>,
|
||||||
us_tcp_stream: Option<Arc<Mutex<SslStream<TcpStream>>>>,
|
us_tcp_stream: Option<Arc<Mutex<DataStreamType>>>,
|
||||||
remote_endpoint: String,
|
//remote_endpoint: String,
|
||||||
|
remote_host: String,
|
||||||
|
remote_port: String,
|
||||||
ds_inner_m: Arc<Mutex<DownStreamInner>>,
|
ds_inner_m: Arc<Mutex<DownStreamInner>>,
|
||||||
us_inner_m: Arc<Mutex<UpStreamInner>>,
|
us_inner_m: Arc<Mutex<UpStreamInner>>,
|
||||||
inner_handlers: InnerHandlers<H>,
|
inner_handlers: InnerHandlers<H>,
|
||||||
|
|
@ -335,14 +453,15 @@ where
|
||||||
|
|
||||||
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> FullDuplexTcp<H> {
|
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> FullDuplexTcp<H> {
|
||||||
|
|
||||||
pub fn new(ds_tcp_stream: SslStream<TcpStream>, remote_endpoint: String, handlers: InnerHandlers<H>) -> Self {
|
pub fn new(ds_tcp_stream: SslStream<TcpStream>, remote_host: String, remote_port: String, handlers: InnerHandlers<H>) -> Self {
|
||||||
|
|
||||||
let _ = ds_tcp_stream.get_ref().set_read_timeout(Some(Duration::from_millis(50)));
|
let _ = ds_tcp_stream.get_ref().set_read_timeout(Some(Duration::from_millis(50)));
|
||||||
|
|
||||||
FullDuplexTcp {
|
FullDuplexTcp {
|
||||||
ds_tcp_stream: Arc::new(Mutex::new(ds_tcp_stream)),
|
ds_tcp_stream: Arc::new(Mutex::new(ds_tcp_stream)),
|
||||||
us_tcp_stream: None,
|
us_tcp_stream: None,
|
||||||
remote_endpoint,
|
remote_host,
|
||||||
|
remote_port,
|
||||||
ds_inner_m: Arc::new(Mutex::new(DownStreamInner{ds_stream: None, internal_data_buffer: Vec::<u8>::new()})),
|
ds_inner_m: Arc::new(Mutex::new(DownStreamInner{ds_stream: None, internal_data_buffer: Vec::<u8>::new()})),
|
||||||
us_inner_m: Arc::new(Mutex::new(UpStreamInner{us_stream: None, internal_data_buffer: Vec::<u8>::new()})),
|
us_inner_m: Arc::new(Mutex::new(UpStreamInner{us_stream: None, internal_data_buffer: Vec::<u8>::new()})),
|
||||||
inner_handlers: handlers,
|
inner_handlers: handlers,
|
||||||
|
|
@ -519,17 +638,15 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
|
||||||
|
|
||||||
let connector = sslbuilder.build();
|
let connector = sslbuilder.build();
|
||||||
|
|
||||||
let s = match TcpStream::connect(self.remote_endpoint.as_str()) {
|
let s = match TcpStream::connect(format!("{}:{}", self.remote_host, self.remote_port)) {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.handle_error(format!("Can't connect to remote host: {}\nErr: {}", self.remote_endpoint, e).as_str());
|
self.handle_error(format!("Can't connect to remote host: {}\nErr: {}", format!("{}:{}", self.remote_host, self.remote_port), e).as_str());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let r_host: Vec<&str> = self.remote_endpoint.as_str().split(":").collect();
|
let s = match connector.connect(self.remote_host.as_str(), s) {
|
||||||
|
|
||||||
let s = match connector.connect(r_host[0], s) {
|
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.handle_error(format!("Failed to accept TLS/SSL handshake: {}", e).as_str());
|
self.handle_error(format!("Failed to accept TLS/SSL handshake: {}", e).as_str());
|
||||||
|
|
|
||||||
46
src/lib.rs
46
src/lib.rs
|
|
@ -1,6 +1,6 @@
|
||||||
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
|
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
|
||||||
use std::net::{TcpListener};
|
use std::net::{TcpListener};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::Arc;
|
||||||
use std::{process, thread};
|
use std::{process, thread};
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
|
|
@ -48,7 +48,7 @@ pub struct SSLRelay<H>
|
||||||
where
|
where
|
||||||
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static,
|
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static,
|
||||||
{
|
{
|
||||||
config: Option<RelayConfig>,
|
config: RelayConfig,
|
||||||
handlers: Option<InnerHandlers<H>>,
|
handlers: Option<InnerHandlers<H>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -62,28 +62,40 @@ where
|
||||||
|
|
||||||
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> SSLRelay<H> {
|
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> SSLRelay<H> {
|
||||||
|
|
||||||
pub fn new(handlers: H) -> Self {
|
pub fn new(handlers: H, config_path: ConfigType<String>) -> Self {
|
||||||
|
|
||||||
SSLRelay {
|
SSLRelay {
|
||||||
config: None,
|
config: Self::load_relay_config(config_path),
|
||||||
handlers: Some(InnerHandlers{cb: handlers}),
|
handlers: Some(InnerHandlers{cb: handlers}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn load_config(&mut self, config_path: ConfigType<String>) {
|
|
||||||
self.config = Some(self.load_relay_config(config_path));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start(&mut self) {
|
pub fn start(&mut self) {
|
||||||
|
|
||||||
let rc_pointer = Arc::new(Mutex::new(self.config.as_ref().unwrap().clone()));
|
/* For UpStream and DownStream TCP data type separation. It should probably start here.
|
||||||
|
* start() will setup anything that needs to be setup before starting the listener.
|
||||||
|
* Once everything is initialized it will then call another handle function that is
|
||||||
|
* decided based upon the DS/US options.
|
||||||
|
* Basically this method is decided from down stream options
|
||||||
|
* If DS is set to RAW mode then it will call the RAW TCP data mode handler vice versa.
|
||||||
|
* The stream type will be wrapped in a mode type.
|
||||||
|
* DataStreamType::TLS(SslStream<TcpStream>) or DataStreamType::RAW(TcpStream)
|
||||||
|
* This will be passed into the Full Duplex TCP simulator object and those
|
||||||
|
* methods within will decide which handler is called for each specific stream.
|
||||||
|
* WILL ONLY DECIDE HANDLER IF CANT DO GENERIC DATA TYPING WITH TRAITS
|
||||||
|
* The remote / UpStream data type will be decided by another DataStreamType variant and passed separately
|
||||||
|
* into the Full Duplex TCP simulator.
|
||||||
|
|
||||||
let rhost = rc_pointer.lock().unwrap().remote_host.clone();
|
* Maybe streams passed into FDTCP simulator object could be typed as Trait requirements instead of a strict type?
|
||||||
let rport = rc_pointer.lock().unwrap().remote_port.clone();
|
* Like Write/Read etc.
|
||||||
|
*/
|
||||||
|
|
||||||
|
let rhost = self.config.remote_host.clone();
|
||||||
|
let rport = self.config.remote_port.clone();
|
||||||
let remote_endpoint = format!("{}:{}", rhost, rport);
|
let remote_endpoint = format!("{}:{}", rhost, rport);
|
||||||
|
|
||||||
let acceptor = self.setup_ssl_config(self.config.as_ref().unwrap().ssl_private_key_path.clone(), self.config.as_ref().unwrap().ssl_cert_path.clone());
|
let acceptor = self.setup_ssl_config(self.config.ssl_private_key_path.clone(), self.config.ssl_cert_path.clone());
|
||||||
let listener = TcpListener::bind(format!("{}:{}", self.config.as_ref().unwrap().bind_host.clone(), self.config.as_ref().unwrap().bind_port.clone())).unwrap();
|
let listener = TcpListener::bind(format!("{}:{}", self.config.bind_host.clone(), self.config.bind_port.clone())).unwrap();
|
||||||
|
|
||||||
for stream in listener.incoming() {
|
for stream in listener.incoming() {
|
||||||
|
|
||||||
|
|
@ -91,16 +103,18 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
|
|
||||||
let acceptor = acceptor.clone();
|
let acceptor = acceptor.clone();
|
||||||
//let rc_config = rc_pointer.clone();
|
|
||||||
let handler_clone = self.handlers.as_ref().unwrap().clone();
|
let handler_clone = self.handlers.as_ref().unwrap().clone();
|
||||||
let r_endpoint = remote_endpoint.clone();
|
let r_endpoint = remote_endpoint.clone();
|
||||||
|
|
||||||
|
let r_host = rhost.clone();
|
||||||
|
let r_port = rport.clone();
|
||||||
|
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
|
|
||||||
match acceptor.accept(stream) {
|
match acceptor.accept(stream) {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
// FULL DUPLEX OBJECT CREATION HERE
|
// FULL DUPLEX OBJECT CREATION HERE
|
||||||
FullDuplexTcp::new(stream, r_endpoint, handler_clone).handle();
|
FullDuplexTcp::new(stream, r_host, r_port, handler_clone).handle();
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
||||||
|
|
@ -114,7 +128,7 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_relay_config(&self, config_path: ConfigType<String>) -> RelayConfig {
|
fn load_relay_config(config_path: ConfigType<String>) -> RelayConfig {
|
||||||
|
|
||||||
let mut resolved_path = String::from("./relay_config.toml");
|
let mut resolved_path = String::from("./relay_config.toml");
|
||||||
match config_path {
|
match config_path {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue