Data handling completely rebuilt. Now correctly handles all TCP/IP traffic.
This commit is contained in:
parent
1aa97d6493
commit
f19c7d8f2a
12 changed files with 493 additions and 604 deletions
489
src/data.rs
489
src/data.rs
|
|
@ -1,141 +1,432 @@
|
|||
use std::time::Duration;
|
||||
use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode};
|
||||
use std::net::TcpStream;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::net::{TcpStream, Shutdown};
|
||||
use std::sync::mpsc::{self, Receiver, Sender};
|
||||
use std::thread;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
#[derive(PartialEq)]
|
||||
pub enum StreamDirection {
|
||||
Upstream,// Data coming from remote host
|
||||
DownStream,// Data coming from origin host
|
||||
use crate::{HandlerCallbacks, InnerHandlers};
|
||||
|
||||
enum FullDuplexTcpState {
|
||||
DownStreamRead,
|
||||
DownStreamWrite(Vec<u8>),
|
||||
UpStreamRead,
|
||||
UpStreamWrite(Vec<u8>),
|
||||
DownStreamShutDown,
|
||||
UpStreamShutDown,
|
||||
}
|
||||
|
||||
pub struct DataHandler {
|
||||
pub tcp_stream: Option<SslStream<TcpStream>>,
|
||||
relay_stream: Option<SslStream<TcpStream>>,
|
||||
remote_host: String,
|
||||
pub stream_direction: StreamDirection,
|
||||
enum DataPipe {
|
||||
DataWrite(Vec<u8>),
|
||||
Finished,
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
impl DataHandler {
|
||||
struct DownStreamInner {
|
||||
ds_stream: Option<Arc<Mutex<SslStream<TcpStream>>>>,
|
||||
internal_data_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
pub fn new(tcp_stream: SslStream<TcpStream>, remote_host: String) -> Self {
|
||||
let _ = tcp_stream.get_ref().set_read_timeout(Some(Duration::from_millis(100)));
|
||||
DataHandler {
|
||||
tcp_stream: Some(tcp_stream),
|
||||
relay_stream: None,
|
||||
remote_host,
|
||||
stream_direction: StreamDirection::DownStream,
|
||||
impl DownStreamInner {
|
||||
pub fn ds_handler(&mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
|
||||
|
||||
loop {
|
||||
|
||||
match data_in.recv_timeout(Duration::from_millis(100)) {
|
||||
|
||||
// DataPipe Received
|
||||
Ok(data_received) => {
|
||||
|
||||
match data_received {
|
||||
DataPipe::DataWrite(data) => {
|
||||
|
||||
let mut stream_lock = match self.ds_stream.as_ref().unwrap().lock() {
|
||||
Ok(sl) => sl,
|
||||
Err(_e) => {
|
||||
println!("[!] Failed to get stream lock!");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match stream_lock.write_all(&data) {
|
||||
Ok(()) => {},
|
||||
Err(_e) => {
|
||||
println!("[!] Failed to write data to DownStream tcp stream!");
|
||||
}
|
||||
}
|
||||
let _ = stream_lock.flush();
|
||||
drop(stream_lock);
|
||||
|
||||
let _ = data_out.send(FullDuplexTcpState::DownStreamRead);
|
||||
},
|
||||
DataPipe::Shutdown => {
|
||||
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
||||
return;
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
Err(_e) => {
|
||||
match _e {
|
||||
mpsc::RecvTimeoutError::Timeout => {},
|
||||
mpsc::RecvTimeoutError::Disconnected => {
|
||||
println!("[!] DownStream data_in channel is disconnected!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}// End of data_in receive
|
||||
|
||||
// If received data
|
||||
if let Some(byte_count) = self.get_data_stream() {
|
||||
if byte_count > 0 {
|
||||
|
||||
let _ = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone()));
|
||||
if let DataPipe::Finished = data_in.recv().unwrap() {
|
||||
self.internal_data_buffer.clear();
|
||||
continue;
|
||||
} else {
|
||||
println!("[!] Could not receive DataPipe::Finished notifier!");
|
||||
}
|
||||
|
||||
} else if byte_count == 0 {
|
||||
|
||||
let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown);
|
||||
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
||||
return;
|
||||
} else if byte_count == -1 {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_data_stream(&mut self, data: &mut Vec<u8>) -> usize {
|
||||
fn get_data_stream(&mut self) -> Option<i64> {
|
||||
|
||||
let mut data_length: usize = 0;
|
||||
let mut data_length: i64 = 0;
|
||||
|
||||
match self.stream_direction {
|
||||
loop {
|
||||
|
||||
StreamDirection::DownStream => {
|
||||
let mut r_buf = [0; 1024];
|
||||
|
||||
loop {
|
||||
match self.ds_stream.as_mut().unwrap().lock().unwrap().read(&mut r_buf) {
|
||||
|
||||
let mut r_buf = [0; 1024];
|
||||
Ok(bytes_read) => {
|
||||
|
||||
match self.tcp_stream.as_mut().unwrap().read(&mut r_buf) {
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
|
||||
Ok(bytes_read) => {
|
||||
} else if bytes_read != 0 && bytes_read <= 1024 {
|
||||
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
let mut tmp_buf = r_buf.to_vec();
|
||||
tmp_buf.truncate(bytes_read);
|
||||
|
||||
} else if bytes_read != 0 && bytes_read <= 1024 {
|
||||
let _bw = self.internal_data_buffer.write(&tmp_buf).unwrap();
|
||||
data_length += bytes_read as i64;
|
||||
|
||||
let mut tmp_buf = r_buf.to_vec();
|
||||
tmp_buf.truncate(bytes_read);
|
||||
} else {
|
||||
println!("[+] Else hit!!!!!!!!!!!!!!!!!!!!!!");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
match e.kind() {
|
||||
|
||||
let _bw = data.write(&tmp_buf).unwrap();
|
||||
data_length += bytes_read;
|
||||
io::ErrorKind::WouldBlock => {
|
||||
if data_length == 0 {
|
||||
|
||||
} else {
|
||||
println!("[+] Else hit!!!!!!!!!!!!!!!!!!!!!!");
|
||||
data_length = -1;
|
||||
}
|
||||
break;
|
||||
|
||||
},
|
||||
Err(e) => {
|
||||
match e.kind() {
|
||||
io::ErrorKind::WouldBlock => {
|
||||
break;
|
||||
},
|
||||
_ => {println!("[!!!] Got error: {}",e);}
|
||||
_ => {println!("[!!!] Got error: {}",e);}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
return Some(data_length);
|
||||
}
|
||||
}
|
||||
|
||||
struct UpStreamInner{
|
||||
us_stream: Option<Arc<Mutex<SslStream<TcpStream>>>>,
|
||||
internal_data_buffer: Vec<u8>
|
||||
}
|
||||
|
||||
impl UpStreamInner {
|
||||
pub fn us_handler(&mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
|
||||
|
||||
loop {
|
||||
|
||||
match data_in.recv_timeout(Duration::from_millis(100)) {
|
||||
|
||||
Ok(data_received) => {
|
||||
|
||||
match data_received {
|
||||
DataPipe::DataWrite(data) => {
|
||||
|
||||
let mut stream_lock = match self.us_stream.as_ref().unwrap().lock() {
|
||||
Ok(sl) => sl,
|
||||
Err(_e) => {
|
||||
println!("[!] Failed to get stream lock!");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match stream_lock.write_all(&data) {
|
||||
Ok(()) => {},
|
||||
Err(_e) => {
|
||||
println!("[!] Failed to write data to DownStream tcp stream!");
|
||||
}
|
||||
}
|
||||
let _ = stream_lock.flush();
|
||||
drop(stream_lock);
|
||||
|
||||
let _ = data_out.send(FullDuplexTcpState::UpStreamRead);
|
||||
},
|
||||
DataPipe::Shutdown => {
|
||||
let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
||||
return;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
match e {
|
||||
mpsc::RecvTimeoutError::Timeout => {},
|
||||
mpsc::RecvTimeoutError::Disconnected => {
|
||||
println!("[!] UpStream data_in channel is disconnected!");
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
StreamDirection::Upstream => {
|
||||
loop {
|
||||
}// End of data_in receive
|
||||
|
||||
let mut r_buf = [0; 1024];
|
||||
if let Some(byte_count) = self.get_data_stream() {
|
||||
if byte_count > 0 {
|
||||
|
||||
match self.relay_stream.as_mut().unwrap().read(&mut r_buf) {
|
||||
|
||||
Ok(bytes_read) => {
|
||||
|
||||
if bytes_read == 0 {
|
||||
break;
|
||||
|
||||
} else if bytes_read != 0 && bytes_read <= 1024 {
|
||||
|
||||
let mut tmp_buf = r_buf.to_vec();
|
||||
tmp_buf.truncate(bytes_read);
|
||||
|
||||
let _bw = data.write(&tmp_buf).unwrap();
|
||||
data_length += bytes_read;
|
||||
|
||||
} else {
|
||||
println!("[+] Else hit!!!!!!!!!!!!!!!!!!!!!!");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
match e.kind() {
|
||||
io::ErrorKind::WouldBlock => {
|
||||
break;
|
||||
},
|
||||
_ => {println!("[!!!] Got error: {}",e);}
|
||||
}
|
||||
},
|
||||
let _ = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone()));
|
||||
if let DataPipe::Finished = data_in.recv().unwrap() {
|
||||
self.internal_data_buffer.clear();
|
||||
continue;
|
||||
} else {
|
||||
println!("[!] Could not receive DataPipe::Finished notifier!");
|
||||
}
|
||||
|
||||
} else if byte_count == 0 {
|
||||
|
||||
let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown);
|
||||
let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
||||
return;
|
||||
} else if byte_count == -1 {
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_data_stream(&mut self) -> Option<i64> {
|
||||
|
||||
let mut data_length: i64 = 0;
|
||||
|
||||
loop {
|
||||
|
||||
let mut r_buf = [0; 1024];
|
||||
|
||||
match self.us_stream.as_mut().unwrap().lock().unwrap().read(&mut r_buf) {
|
||||
|
||||
Ok(bytes_read) => {
|
||||
|
||||
if bytes_read == 0 {
|
||||
|
||||
break;
|
||||
|
||||
} else if bytes_read != 0 && bytes_read <= 1024 {
|
||||
|
||||
let mut tmp_buf = r_buf.to_vec();
|
||||
tmp_buf.truncate(bytes_read);
|
||||
|
||||
let _bw = self.internal_data_buffer.write(&tmp_buf).unwrap();
|
||||
data_length += bytes_read as i64;
|
||||
|
||||
} else {
|
||||
println!("[+] Else hit!!!!!!!!!!!!!!!!!!!!!!");
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
match e.kind() {
|
||||
|
||||
io::ErrorKind::WouldBlock => {
|
||||
if data_length == 0 {
|
||||
data_length = -1;
|
||||
}
|
||||
break;
|
||||
},
|
||||
_ => {println!("[!!!] Got error: {}",e);}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
return Some(data_length);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FullDuplexTcp<H>
|
||||
where
|
||||
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static,
|
||||
{
|
||||
ds_tcp_stream: Arc<Mutex<SslStream<TcpStream>>>,
|
||||
us_tcp_stream: Option<Arc<Mutex<SslStream<TcpStream>>>>,
|
||||
remote_endpoint: String,
|
||||
ds_inner_m: Arc<Mutex<DownStreamInner>>,
|
||||
us_inner_m: Arc<Mutex<UpStreamInner>>,
|
||||
inner_handlers: InnerHandlers<H>,
|
||||
}
|
||||
|
||||
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> FullDuplexTcp<H> {
|
||||
|
||||
pub fn new(ds_tcp_stream: SslStream<TcpStream>, remote_endpoint: String, handlers: InnerHandlers<H>) -> Self {
|
||||
|
||||
let _ = ds_tcp_stream.get_ref().set_read_timeout(Some(Duration::from_millis(100)));
|
||||
|
||||
FullDuplexTcp {
|
||||
ds_tcp_stream: Arc::new(Mutex::new(ds_tcp_stream)),
|
||||
us_tcp_stream: None,
|
||||
remote_endpoint,
|
||||
ds_inner_m: Arc::new(Mutex::new(DownStreamInner{ds_stream: None, internal_data_buffer: Vec::<u8>::new()})),
|
||||
us_inner_m: Arc::new(Mutex::new(UpStreamInner{us_stream: None, internal_data_buffer: Vec::<u8>::new()})),
|
||||
inner_handlers: handlers,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle(&mut self) {
|
||||
|
||||
if self.connect_endpoint() == -1 {
|
||||
let _ = self.ds_tcp_stream.lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
||||
return;
|
||||
}
|
||||
|
||||
let (state_sender, state_receiver): (Sender<FullDuplexTcpState>, Receiver<FullDuplexTcpState>) = mpsc::channel();
|
||||
let (ds_data_pipe_sender, ds_data_pipe_receiver): (Sender<DataPipe>, Receiver<DataPipe>) = mpsc::channel();
|
||||
let (us_data_pipe_sender, us_data_pipe_receiver): (Sender<DataPipe>, Receiver<DataPipe>) = mpsc::channel();
|
||||
|
||||
self.ds_inner_m.lock().unwrap().ds_stream = Some(self.ds_tcp_stream.clone());
|
||||
let ds_method_pointer = self.ds_inner_m.clone();
|
||||
let ds_state_bc = state_sender.clone();
|
||||
|
||||
self.us_inner_m.lock().unwrap().us_stream = Some(self.us_tcp_stream.as_ref().unwrap().clone());
|
||||
let us_method_pointer = self.us_inner_m.clone();
|
||||
let us_state_bc = state_sender.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
ds_method_pointer.lock().unwrap().ds_handler(ds_state_bc, ds_data_pipe_receiver);
|
||||
});
|
||||
|
||||
thread::spawn(move || {
|
||||
us_method_pointer.lock().unwrap().us_handler(us_state_bc, us_data_pipe_receiver);
|
||||
});
|
||||
|
||||
loop {
|
||||
|
||||
match state_receiver.recv() {
|
||||
Ok(state_request) => {
|
||||
match state_request {
|
||||
|
||||
// DownStream Write Request
|
||||
FullDuplexTcpState::DownStreamWrite(mut data) => {
|
||||
|
||||
/*
|
||||
Callbacks that work with data from UpStream go here
|
||||
*/
|
||||
|
||||
let inner_handlers_clone = self.inner_handlers.clone();
|
||||
let in_data = data.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
inner_handlers_clone.cb.us_nb_callback(in_data);
|
||||
});
|
||||
|
||||
self.inner_handlers.cb.us_b_callback(&mut data);
|
||||
let _ = ds_data_pipe_sender.send(DataPipe::DataWrite(data));
|
||||
|
||||
match state_receiver.recv().unwrap() {
|
||||
FullDuplexTcpState::DownStreamRead => {
|
||||
let _ = us_data_pipe_sender.send(DataPipe::Finished);
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
// UpStream Write Request
|
||||
FullDuplexTcpState::UpStreamWrite(mut data) => {
|
||||
|
||||
/*
|
||||
Callbacks that work with data from DownStream go here
|
||||
*/
|
||||
|
||||
let inner_handlers_clone = self.inner_handlers.clone();
|
||||
let in_data = data.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
inner_handlers_clone.cb.ds_nb_callback(in_data);
|
||||
});
|
||||
|
||||
self.inner_handlers.cb.ds_b_callback(&mut data);
|
||||
let _ = us_data_pipe_sender.send(DataPipe::DataWrite(data));
|
||||
|
||||
match state_receiver.recv().unwrap() {
|
||||
FullDuplexTcpState::UpStreamRead => {
|
||||
let _ = ds_data_pipe_sender.send(DataPipe::Finished);
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
// DownStreamShutDown Request
|
||||
FullDuplexTcpState::DownStreamShutDown => {
|
||||
let _ = us_data_pipe_sender.send(DataPipe::Shutdown);
|
||||
return;
|
||||
},
|
||||
// UpStreamShutDown Request
|
||||
FullDuplexTcpState::UpStreamShutDown => {
|
||||
let _ = ds_data_pipe_sender.send(DataPipe::Shutdown);
|
||||
return;
|
||||
},
|
||||
_ => {}
|
||||
}
|
||||
},
|
||||
Err(_e) => {
|
||||
println!("[!] State receiver communication channel has closed!");
|
||||
}
|
||||
}
|
||||
}
|
||||
return data_length;
|
||||
}
|
||||
|
||||
fn connect_endpoint(&mut self) -> i8 {
|
||||
|
||||
pub fn relay_data(&mut self, data: &Vec<u8>) -> Option<i8> {
|
||||
let mut sslbuilder = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
sslbuilder.set_verify(SslVerifyMode::NONE);
|
||||
|
||||
let mut retries = 3;
|
||||
loop {
|
||||
let connector = sslbuilder.build();
|
||||
|
||||
let mut sslbuild = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
sslbuild.set_verify(SslVerifyMode::NONE);
|
||||
let connector = sslbuild.build();
|
||||
let stream = TcpStream::connect(&self.remote_host).unwrap();
|
||||
let _ = stream.set_read_timeout(Some(Duration::from_millis(500)));
|
||||
let mut stream = match connector.connect(&self.remote_host, stream) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
println!("[Error] {}", e);
|
||||
if retries == 0 {
|
||||
println!("[!] Request relay retries: 0");
|
||||
return None;
|
||||
}
|
||||
retries -= 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let s = match TcpStream::connect(self.remote_endpoint.as_str()) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
println!("[!] Can't connect to remote host: {}\nErr: {}", self.remote_endpoint, e);
|
||||
return -1;
|
||||
}
|
||||
};
|
||||
|
||||
stream.write_all(&data).unwrap();
|
||||
let _ = stream.flush();
|
||||
self.relay_stream = Some(stream);
|
||||
return Some(0);
|
||||
}
|
||||
let r_host: Vec<&str> = self.remote_endpoint.as_str().split(":").collect();
|
||||
|
||||
let s = connector.connect(r_host[0], s).unwrap();
|
||||
|
||||
self.us_tcp_stream = Some(
|
||||
Arc::new(
|
||||
Mutex::new(
|
||||
s
|
||||
)));
|
||||
let _ = self.us_tcp_stream.as_ref().unwrap().lock().unwrap().get_ref().set_read_timeout(Some(Duration::from_millis(100)));
|
||||
return 0;
|
||||
}
|
||||
} // DataHandler
|
||||
}
|
||||
312
src/http.rs
312
src/http.rs
|
|
@ -1,312 +0,0 @@
|
|||
use httparse::{self, Header};
|
||||
use chunked_transfer::Decoder;
|
||||
use flate2::read::GzDecoder;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::io::Read;
|
||||
|
||||
pub struct RelayedResponse<'a> {
|
||||
http_version: Option<u8>,
|
||||
http_code: Option<u16>,
|
||||
http_reason: Option<String>,
|
||||
http_headers: Option<Vec<Header<'a>>>,
|
||||
http_body: Option<String>,
|
||||
}
|
||||
|
||||
pub struct RelayRequest<'a> {
|
||||
http_method: Option<String>,
|
||||
http_path: Option<String>,
|
||||
http_version: Option<u8>,
|
||||
http_headers: Option<Vec<Header<'a>>>,
|
||||
http_body: Option<String>,
|
||||
}
|
||||
|
||||
/* Unused HTTP helper functions
|
||||
pub fn get_req_headers<'a>(data: &'a Vec<u8>) -> Option<Vec<Header<'a>>> {
|
||||
|
||||
let mut headers = [httparse::EMPTY_HEADER; 128];
|
||||
let mut request = httparse::Request::new(&mut headers);
|
||||
|
||||
let req = request.parse(data).unwrap();
|
||||
|
||||
if req.is_complete() {
|
||||
return Some(headers.to_vec());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn get_res_headers(data: &Vec<u8>) -> Option<Vec<Header>> {
|
||||
|
||||
let mut headers = [httparse::EMPTY_HEADER; 128];
|
||||
let mut response = httparse::Response::new(&mut headers);
|
||||
|
||||
let res = response.parse(data).unwrap();
|
||||
|
||||
if res.is_complete() {
|
||||
return Some(headers.to_vec());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn get_host(data: &Vec<u8>) -> Option<String> {
|
||||
|
||||
return Some("137.220.37.67".to_string());
|
||||
let mut headers = [httparse::EMPTY_HEADER; 128];
|
||||
|
||||
let mut request = httparse::Request::new(&mut headers);
|
||||
|
||||
let req = request.parse(data).unwrap();
|
||||
|
||||
if req.is_complete() {
|
||||
for header in headers.iter() {
|
||||
if header.name == "Host" || header.name == "host" {
|
||||
//println!("Host -> {}", String::from_utf8(header.value.to_vec()).unwrap());
|
||||
return Some(String::from_utf8(header.value.to_vec()).unwrap());
|
||||
}
|
||||
}
|
||||
None
|
||||
} else if req.is_partial() {
|
||||
for header in headers.iter() {
|
||||
if header.name == "Host" || header.name == "host" {
|
||||
//println!("Host -> {}", String::from_utf8(header.value.to_vec()).unwrap());
|
||||
return Some(String::from_utf8(header.value.to_vec()).unwrap());
|
||||
}
|
||||
}
|
||||
None
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_cookie(data: &Vec<u8>) -> Option<String> {
|
||||
|
||||
let mut headers = [httparse::EMPTY_HEADER; 128];
|
||||
|
||||
let mut request = httparse::Request::new(&mut headers);
|
||||
|
||||
let req = request.parse(data).unwrap();
|
||||
|
||||
if req.is_complete() {
|
||||
for header in headers.iter() {
|
||||
if header.name == "Cookie" || header.name == "cookie" {
|
||||
//println!("Host -> {}", String::from_utf8(header.value.to_vec()).unwrap());
|
||||
return Some(String::from_utf8(header.value.to_vec()).unwrap());
|
||||
}
|
||||
}
|
||||
None
|
||||
} else if req.is_partial() {
|
||||
for header in headers.iter() {
|
||||
if header.name == "Cookie" || header.name == "cookie" {
|
||||
//println!("Host -> {}", String::from_utf8(header.value.to_vec()).unwrap());
|
||||
return Some(String::from_utf8(header.value.to_vec()).unwrap());
|
||||
}
|
||||
}
|
||||
None
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
*/
|
||||
pub fn http_req_verbose(data: &Vec<u8>, mode: u8) {
|
||||
|
||||
let req_info = get_request(&data);
|
||||
let req_header_list = req_info.http_headers.unwrap();
|
||||
let mut req_header_string = String::new();
|
||||
|
||||
for header in req_header_list {
|
||||
if header.value.to_vec().len() == 0 {continue;}
|
||||
req_header_string.push_str(format!("[-->] {}: {}\n", header.name, String::from_utf8(header.value.to_vec()).unwrap()).as_str());
|
||||
}
|
||||
if mode == 1 {
|
||||
println!("================================\n[-->] HTTP Version: {}\n[-->] HTTP Method: {} {}\n[-->] HTTP Headers:\n{}\n[-->] HTTP Body:\n{}\n"
|
||||
,req_info.http_version.unwrap()
|
||||
,req_info.http_method.unwrap()
|
||||
,req_info.http_path.unwrap()
|
||||
,req_header_string
|
||||
,req_info.http_body.unwrap()
|
||||
);
|
||||
} else if mode == 2 {
|
||||
println!("[Req] {} {}", req_info.http_method.unwrap(), req_info.http_path.unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn http_res_verbose(response_data: &Vec<u8>, mode: u8) {
|
||||
|
||||
let res_info = get_response(&response_data);
|
||||
let header_list = res_info.http_headers.unwrap();
|
||||
let mut header_string = String::new();
|
||||
|
||||
for header in header_list {
|
||||
if header.value.to_vec().len() == 0 {continue;}
|
||||
header_string.push_str(format!("[<--] {}: {}\n", header.name, String::from_utf8(header.value.to_vec()).unwrap()).as_str());
|
||||
}
|
||||
if mode == 1 {
|
||||
println!("\n[<--] HTTP Version: {}\n[<--] HTTP Code: {} {}\n[<--] HTTP Headers:\n{}\n[<--] HTTP Body:\n{}\n================================\n"
|
||||
,res_info.http_version.unwrap()
|
||||
,res_info.http_code.unwrap()
|
||||
,res_info.http_reason.unwrap()
|
||||
,header_string
|
||||
,res_info.http_body.unwrap()
|
||||
);
|
||||
} else if mode == 2 {
|
||||
println!("[Res] {} {}", res_info.http_code.unwrap(), res_info.http_reason.unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_response<'a>(data: &'a Vec<u8>) -> RelayedResponse<'a> {
|
||||
|
||||
let mut headers = [httparse::EMPTY_HEADER; 128];
|
||||
let mut res = httparse::Response::new(&mut headers);
|
||||
|
||||
let res_chk = res.parse(data);
|
||||
|
||||
loop {
|
||||
|
||||
if !res_chk.unwrap().is_complete() {
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
continue;
|
||||
}
|
||||
|
||||
let reason = match res.reason {
|
||||
Some(r) => Some(r.to_string()),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let header_vec = res.headers.to_vec();
|
||||
|
||||
let body = get_http_body(&data, header_vec).unwrap_or(String::from("[-] Failed to get http response body!").into_bytes());
|
||||
let string_body = String::from_utf8(body).unwrap();
|
||||
|
||||
return RelayedResponse {
|
||||
http_version: res.version,
|
||||
http_code: res.code,
|
||||
http_reason: reason,
|
||||
http_headers: Some(headers.to_vec()),
|
||||
http_body: Some(string_body),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_request<'a>(data: &'a Vec<u8>) -> RelayRequest<'a> {
|
||||
|
||||
let mut headers = [httparse::EMPTY_HEADER; 128];
|
||||
let mut req = httparse::Request::new(&mut headers);
|
||||
|
||||
let req_chk = req.parse(data);
|
||||
|
||||
loop {
|
||||
|
||||
if !req_chk.unwrap().is_complete() {
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
continue;
|
||||
}
|
||||
|
||||
let method = match req.method {
|
||||
Some(r) => Some(r.to_string()),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let path = match req.path {
|
||||
Some(p) => Some(p.to_string()),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let header_vec = req.headers.to_vec();
|
||||
|
||||
let body = get_http_body(&data, header_vec).unwrap_or(String::from("[-] Failed to get http request body!").into_bytes());
|
||||
let string_body = String::from_utf8(body).unwrap();
|
||||
|
||||
return RelayRequest {
|
||||
http_method: method,
|
||||
http_path: path,
|
||||
http_version: req.version,
|
||||
http_headers: Some(headers.to_vec()),
|
||||
http_body: Some(string_body),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_http_body(data: &Vec<u8>, headers: Vec<Header>) -> Option<Vec<u8>> {
|
||||
|
||||
let mut cl: u64 = 0;
|
||||
let encoding_check = headers.clone();
|
||||
for header in headers {
|
||||
|
||||
if header.name == "Content-Length" || header.name == "content-length" {
|
||||
let length = match String::from_utf8(header.value.to_vec()) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
println!("[!] ERROR: {}",e);
|
||||
return None;
|
||||
},
|
||||
};
|
||||
|
||||
cl += length.parse::<u64>().unwrap();
|
||||
let mut r_body = data.to_vec();
|
||||
r_body.reverse();
|
||||
r_body.truncate(cl as usize);
|
||||
r_body.reverse();
|
||||
|
||||
return Some(r_body);
|
||||
|
||||
} else if header.name == "Transfer-Encoding" || header.name == "transfer-encoding" {
|
||||
|
||||
let te_string = String::from_utf8(header.value.to_vec()).unwrap();
|
||||
|
||||
if te_string == "chunked" {
|
||||
|
||||
let mut new_vec = data.to_vec();
|
||||
let new_vec_iter = new_vec.to_vec();
|
||||
let mut new_vec_iter = new_vec_iter.iter();
|
||||
let mut i = 0;
|
||||
|
||||
loop {
|
||||
if let Some(&0x0d) = new_vec_iter.next() {
|
||||
i += 1;
|
||||
if let Some(&0x0a) = new_vec_iter.next() {
|
||||
i += 1;
|
||||
if let Some(&0x0d) = new_vec_iter.next() {
|
||||
i += 1;
|
||||
if let Some(&0x0a) = new_vec_iter.next() {
|
||||
i += 1;
|
||||
break;
|
||||
} else {i += 1;}
|
||||
} else {i += 1;}
|
||||
} else {i += 1;}
|
||||
} else {i += 1;}
|
||||
}
|
||||
|
||||
new_vec.reverse();
|
||||
new_vec.truncate(data.len() - i);
|
||||
new_vec.reverse();
|
||||
|
||||
let mut decoder = Decoder::new(new_vec.as_slice());
|
||||
let mut blob = Vec::new();
|
||||
let _ = decoder.read_to_end(&mut blob);
|
||||
|
||||
let mut g_encoded = false;
|
||||
for header in encoding_check {
|
||||
if header.name == "Content-Encoding" || header.name == "content-encoding" {
|
||||
let s = String::from_utf8(header.value.to_vec()).unwrap();
|
||||
if s == "gzip" {
|
||||
g_encoded = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if g_encoded {
|
||||
let mut gzd = GzDecoder::new(&blob[..]);
|
||||
let mut unzipped = Vec::new();
|
||||
match gzd.read_to_end(&mut unzipped) {
|
||||
Ok(_) => {},
|
||||
Err(_) => return None,
|
||||
}
|
||||
|
||||
return Some(unzipped);
|
||||
} else {
|
||||
return Some(blob);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return Some(String::from("None").into_bytes());
|
||||
}
|
||||
127
src/lib.rs
127
src/lib.rs
|
|
@ -1,6 +1,5 @@
|
|||
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod, SslStream};
|
||||
use std::io::Write;
|
||||
use std::net::{TcpListener, TcpStream};
|
||||
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
|
||||
use std::net::{TcpListener};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::{process, thread};
|
||||
use std::env;
|
||||
|
|
@ -10,10 +9,7 @@ use std::path::Path;
|
|||
use toml::Value as TValue;
|
||||
|
||||
mod data;
|
||||
use data::{DataHandler, StreamDirection};
|
||||
|
||||
mod http;
|
||||
use http as http_helper;
|
||||
use data::FullDuplexTcp;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RelayConfig {
|
||||
|
|
@ -23,7 +19,6 @@ pub struct RelayConfig {
|
|||
pub remote_port: String,
|
||||
pub ssl_private_key_path: String,
|
||||
pub ssl_cert_path: String,
|
||||
pub verbose_level: i8,
|
||||
}
|
||||
|
||||
pub trait HandlerCallbacks {
|
||||
|
|
@ -43,19 +38,27 @@ pub enum ConfigType<T> {
|
|||
#[derive(Clone)]
|
||||
pub struct SSLRelay<H>
|
||||
where
|
||||
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + 'static,
|
||||
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static,
|
||||
{
|
||||
config: Option<RelayConfig>,
|
||||
handlers: Option<H>,
|
||||
handlers: Option<InnerHandlers<H>>,
|
||||
}
|
||||
|
||||
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + 'static> SSLRelay<H> {
|
||||
#[derive(Clone)]
|
||||
pub struct InnerHandlers<H>
|
||||
where
|
||||
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static,
|
||||
{
|
||||
cb: H
|
||||
}
|
||||
|
||||
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> SSLRelay<H> {
|
||||
|
||||
pub fn new(handlers: H) -> Self {
|
||||
|
||||
SSLRelay {
|
||||
config: None,
|
||||
handlers: Some(handlers),
|
||||
handlers: Some(InnerHandlers{cb: handlers}),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -66,7 +69,11 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + 'static> SSLR
|
|||
pub fn start(&mut self) {
|
||||
|
||||
let rc_pointer = Arc::new(Mutex::new(self.config.as_ref().unwrap().clone()));
|
||||
let handler_pointer = Arc::new(Mutex::new(self.handlers.take().unwrap()));
|
||||
|
||||
let rhost = rc_pointer.lock().unwrap().remote_host.clone();
|
||||
let rport = rc_pointer.lock().unwrap().remote_port.clone();
|
||||
let remote_endpoint = format!("{}:{}", rhost, rport);
|
||||
|
||||
let acceptor = self.setup_ssl_config(self.config.as_ref().unwrap().ssl_private_key_path.clone(), self.config.as_ref().unwrap().ssl_cert_path.clone());
|
||||
let listener = TcpListener::bind(format!("{}:{}", self.config.as_ref().unwrap().bind_host.clone(), self.config.as_ref().unwrap().bind_port.clone())).unwrap();
|
||||
|
||||
|
|
@ -76,26 +83,23 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + 'static> SSLR
|
|||
Ok(stream) => {
|
||||
|
||||
let acceptor = acceptor.clone();
|
||||
let rc_config = rc_pointer.clone();
|
||||
let handler = handler_pointer.clone();
|
||||
//let rc_config = rc_pointer.clone();
|
||||
let handler_clone = self.handlers.as_ref().unwrap().clone();
|
||||
let r_endpoint = remote_endpoint.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
|
||||
match acceptor.accept(stream) {
|
||||
Ok(stream) => {
|
||||
|
||||
handle_stream(stream, rc_config, handler);
|
||||
return 0;
|
||||
// FULL DUPLEX OBJECT CREATION HERE
|
||||
FullDuplexTcp::new(stream, r_endpoint, handler_clone).handle();
|
||||
},
|
||||
Err(e) => {
|
||||
|
||||
println!("[Error] {}", e);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
});
|
||||
/*let stream = acceptor.accept(stream).unwrap();
|
||||
handle_stream(stream, rc_config, handler);*/
|
||||
},
|
||||
Err(e) => {println!("[Error] Tcp Connection Failed: {}", e)}
|
||||
}
|
||||
|
|
@ -134,7 +138,6 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + 'static> SSLR
|
|||
let ssl_cert_path = config_parsed["ssl_cert_path"].to_string().replace("\"", "");
|
||||
let remote_host = config_parsed["remote_host"].to_string().replace("\"", "");
|
||||
let remote_port = config_parsed["remote_port"].to_string().replace("\"", "");
|
||||
let verbose_level = config_parsed["verbose_level"].to_string().replace("\"", "").parse().unwrap();
|
||||
|
||||
RelayConfig {
|
||||
bind_host: bind_host.clone(),
|
||||
|
|
@ -143,7 +146,6 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + 'static> SSLR
|
|||
ssl_cert_path: ssl_cert_path.clone(),
|
||||
remote_host: remote_host.clone(),
|
||||
remote_port: remote_port.clone(),
|
||||
verbose_level: verbose_level,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -163,81 +165,4 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + 'static> SSLR
|
|||
acceptor.check_private_key().unwrap();
|
||||
Arc::new(acceptor.build())
|
||||
}
|
||||
}// SSLRelay
|
||||
|
||||
/* Rewrite this to handle TCP connections until TCP connection is dropped instead of dropping it */
|
||||
fn handle_stream<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + 'static>(tcp_stream: SslStream<TcpStream>, rc_config: Arc<Mutex<RelayConfig>>, handlers: Arc<Mutex<H>>) {
|
||||
|
||||
let conf_lock = rc_config.lock().unwrap();
|
||||
let remote_host = format!("{}:{}", conf_lock.remote_host, conf_lock.remote_port);
|
||||
let verbose_mode = conf_lock.verbose_level;
|
||||
drop(conf_lock);
|
||||
|
||||
let mut datahandler = DataHandler::new(tcp_stream, remote_host);
|
||||
|
||||
let mut data = Vec::<u8>::new();
|
||||
let mut response_data = Vec::<u8>::new();
|
||||
|
||||
let data_size = datahandler.get_data_stream(&mut data);
|
||||
if data_size == 0 {
|
||||
println!("[!] Got 0 bytes closing tcp stream!");
|
||||
return;
|
||||
}
|
||||
if verbose_mode == 1 {
|
||||
http_helper::http_req_verbose(&data, 1);
|
||||
} else if verbose_mode == 2 {
|
||||
http_helper::http_req_verbose(&data, 2);
|
||||
}
|
||||
|
||||
let handlers_p = handlers.clone();
|
||||
let d = data.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
let handlers_lock = handlers_p.lock().unwrap();
|
||||
handlers_lock.ds_nb_callback(d);
|
||||
drop(handlers_lock);
|
||||
});
|
||||
|
||||
let handlers_p = handlers.clone();
|
||||
let handlers_lock = handlers_p.lock().unwrap();
|
||||
handlers_lock.ds_b_callback(&mut data);
|
||||
drop(handlers_lock);
|
||||
|
||||
match datahandler.relay_data(&data) {
|
||||
Some(_relay_success) => {},
|
||||
None => {
|
||||
println!("[-] relay_data failed!");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Get Upstream Data
|
||||
datahandler.stream_direction = StreamDirection::Upstream;
|
||||
let _response_size = datahandler.get_data_stream(&mut response_data);
|
||||
|
||||
if verbose_mode == 1 {
|
||||
http_helper::http_res_verbose(&response_data, 1);
|
||||
} else if verbose_mode == 2 {
|
||||
http_helper::http_res_verbose(&response_data, 2);
|
||||
}
|
||||
|
||||
// Switch back to DownStream mode to relay data from remote host back to origin host
|
||||
datahandler.stream_direction = StreamDirection::DownStream;
|
||||
|
||||
let handlers_p = handlers.clone();
|
||||
let d = response_data.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
let handlers_lock = handlers_p.lock().unwrap();
|
||||
handlers_lock.us_nb_callback(d);
|
||||
drop(handlers_lock);
|
||||
});
|
||||
|
||||
let handlers_p = handlers.clone();
|
||||
let handlers_lock = handlers_p.lock().unwrap();
|
||||
handlers_lock.us_b_callback(&mut response_data);
|
||||
drop(handlers_lock);
|
||||
|
||||
datahandler.tcp_stream.as_mut().unwrap().write_all(&response_data).unwrap();
|
||||
let _ = datahandler.tcp_stream.as_mut().unwrap().flush();
|
||||
}
|
||||
}// SSLRelay
|
||||
Loading…
Add table
Add a link
Reference in a new issue