use tokio::io::{AsyncRead, AsyncReadExt}; pub struct StreamCodec { stream: S, } impl StreamCodec { pub fn new(stream: S) -> Self { Self { stream } } pub async fn next(&mut self) -> Result, std::io::Error> { let mut buf = vec![0; 8]; self.stream.read_exact(&mut buf).await?; let expected_len = u32::from_be_bytes(buf[0..4].try_into().unwrap()) as usize; if expected_len < 8 || expected_len > 8 * 1024 * 1024 { return Err(std::io::ErrorKind::InvalidData.into()); } buf.resize(expected_len, 0); self.stream.read_exact(&mut buf[8..expected_len]).await?; Ok(buf) } pub fn get_mut(&mut self) -> &mut S { &mut self.stream } } /*#[cfg(test)] mod test { use super::*; use tokio_util::{bytes::BytesMut, codec::Framed}; #[test] fn test_decode() { let stream = futures_util::stream::iter([BytesMut::fr&[0u8]]); let stream = Framed::new(stream, CustomCodec::new()); } }*/