feat: support rustls-0.23, quinn-0.11, and s2n-quic-0.38(unreleased)

This commit is contained in:
Jun Kurihara 2024-05-28 22:35:42 +09:00
commit 53055ab068
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
14 changed files with 51 additions and 960 deletions

3
.gitmodules vendored
View file

@ -1,3 +1,6 @@
[submodule "submodules/rusty-http-cache-semantics"]
path = submodules/rusty-http-cache-semantics
url = git@github.com:junkurihara/rusty-http-cache-semantics.git
[submodule "submodules/s2n-quic"]
path = submodules/s2n-quic
url = git@github.com:junkurihara/s2n-quic.git

View file

@ -2,7 +2,10 @@
## 0.8.0 (Unreleased)
## 0.7.1 -- 0.7.3
- Breaking: Support for `rustls`-0.23.x for http/1.1, 2 and 3. No configuration update is needed at this point.
- Breaking: Along with `rustls`, the cert manager was split from `rpxy-lib` and moved to a new inner crate `rpxy-cert`. This change is to make the cert manager reusable for other projects and to support not only static file based certificates but also other types, e.g., dynamic fetching and management via ACME, in the future.
## 0.7.1 -- 0.7.2
- deps and patches

View file

@ -13,8 +13,8 @@ publish.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["http3-quinn", "cache", "rustls-backend"]
# default = ["cache", "rustls-backend"]
# default = ["http3-quinn", "cache", "rustls-backend"]
default = ["http3-s2n", "cache", "rustls-backend"]
http3-quinn = ["rpxy-lib/http3-quinn"]
http3-s2n = ["rpxy-lib/http3-s2n"]
native-tls-backend = ["rpxy-lib/native-tls-backend"]

View file

@ -13,8 +13,8 @@ publish.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
# default = ["sticky-cookie", "cache", "rustls-backend"]
default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"]
default = ["http3-s2n", "sticky-cookie", "cache", "rustls-backend"]
# default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"]
http3-quinn = ["socket2", "quinn", "h3", "h3-quinn", "rpxy-certs/http3"]
http3-s2n = [
"h3",
@ -76,24 +76,28 @@ rpxy-certs = { path = "../rpxy-certs/", default-features = false }
hot_reload = "0.1.5"
rustls = { version = "0.23.8", default-features = false }
tokio-rustls = { version = "0.26.0", features = ["early-data"] }
webpki = "0.22.4"
x509-parser = "0.16.0"
# logging
tracing = { version = "0.1.40" }
# http/3
quinn = { version = "0.11.1", optional = true }
# h3 = { path = "../submodules/h3/h3/", optional = true }
# h3-quinn = { path = "../submodules/h3/h3-quinn/", optional = true }
h3 = { version = "0.0.5", optional = true }
h3-quinn = { version = "0.0.6", optional = true }
s2n-quic = { version = "1.37.0", default-features = false, features = [
### TODO: workaround for s2n-quic, waiting for release of s2n-quic-0.38.0
s2n-quic = { path = "../submodules/s2n-quic/quic/s2n-quic", optional = true, default-features = false, features = [
"provider-tls-rustls",
], optional = true }
s2n-quic-core = { version = "0.37.0", default-features = false, optional = true }
s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true }
s2n-quic-rustls = { version = "0.37.0", optional = true }
] }
s2n-quic-core = { path = "../submodules/s2n-quic/quic/s2n-quic-core", optional = true, default-features = false }
s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls", optional = true }
s2n-quic-h3 = { path = "../submodules/s2n-quic/quic/s2n-quic-h3", optional = true }
# s2n-quic = { version = "1.37.0", default-features = false, features = [
# "provider-tls-rustls",
# ], optional = true }
# s2n-quic-core = { version = "0.37.0", default-features = false, optional = true }
# s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true }
# s2n-quic-rustls = { version = "0.37.0", optional = true }
##########
# for UDP socket wit SO_REUSEADDR when h3 with quinn
socket2 = { version = "0.5.7", features = ["all"], optional = true }

View file

@ -1,91 +0,0 @@
use async_trait::async_trait;
use rustc_hash::FxHashSet as HashSet;
use rustls::{
sign::{any_supported_type, CertifiedKey},
Certificate, OwnedTrustAnchor, PrivateKey,
};
use std::io;
use x509_parser::prelude::*;
#[async_trait]
// Trait to read certs and keys anywhere from KVS, file, sqlite, etc.
pub trait CryptoSource {
type Error;
/// read crypto materials from source
async fn read(&self) -> Result<CertsAndKeys, Self::Error>;
/// Returns true when mutual tls is enabled
fn is_mutual_tls(&self) -> bool;
}
/// Certificates and private keys in rustls loaded from files
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct CertsAndKeys {
pub certs: Vec<Certificate>,
pub cert_keys: Vec<PrivateKey>,
pub client_ca_certs: Option<Vec<Certificate>>,
}
impl CertsAndKeys {
pub fn parse_server_certs_and_keys(&self) -> Result<CertifiedKey, anyhow::Error> {
// for (server_name_bytes_exp, certs_and_keys) in self.inner.iter() {
let signing_key = self
.cert_keys
.iter()
.find_map(|k| {
if let Ok(sk) = any_supported_type(k) {
Some(sk)
} else {
None
}
})
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Unable to find a valid certificate and key",
)
})?;
Ok(CertifiedKey::new(self.certs.clone(), signing_key))
}
pub fn parse_client_ca_certs(&self) -> Result<(Vec<OwnedTrustAnchor>, HashSet<Vec<u8>>), anyhow::Error> {
let certs = self.client_ca_certs.as_ref().ok_or(anyhow::anyhow!("No client cert"))?;
let owned_trust_anchors: Vec<_> = certs
.iter()
.map(|v| {
// let trust_anchor = tokio_rustls::webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap();
let trust_anchor = webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap();
rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
trust_anchor.subject,
trust_anchor.spki,
trust_anchor.name_constraints,
)
})
.collect();
// TODO: SKID is not used currently
let subject_key_identifiers: HashSet<_> = certs
.iter()
.filter_map(|v| {
// retrieve ca key id (subject key id)
let cert = parse_x509_certificate(&v.0).unwrap().1;
let subject_key_ids = cert
.iter_extensions()
.filter_map(|ext| match ext.parsed_extension() {
ParsedExtension::SubjectKeyIdentifier(skid) => Some(skid),
_ => None,
})
.collect::<Vec<_>>();
if !subject_key_ids.is_empty() {
Some(subject_key_ids[0].0.to_owned())
} else {
None
}
})
.collect();
Ok((owned_trust_anchors, subject_key_identifiers))
}
}

View file

@ -1,36 +0,0 @@
mod certs;
mod service;
use crate::{
backend::BackendAppManager,
constants::{CERTS_WATCH_DELAY_SECS, LOAD_CERTS_ONLY_WHEN_UPDATED},
error::RpxyResult,
};
use hot_reload::{ReloaderReceiver, ReloaderService};
use service::CryptoReloader;
use std::sync::Arc;
pub use certs::{CertsAndKeys, CryptoSource};
pub use service::{ServerCrypto, ServerCryptoBase, SniServerCryptoMap};
/// Result type inner of certificate reloader service
type ReloaderServiceResultInner<T> = (
ReloaderService<CryptoReloader<T>, ServerCryptoBase>,
ReloaderReceiver<ServerCryptoBase>,
);
/// Build certificate reloader service
pub(crate) async fn build_cert_reloader<T>(
app_manager: &Arc<BackendAppManager<T>>,
) -> RpxyResult<ReloaderServiceResultInner<T>>
where
T: CryptoSource + Clone + Send + Sync + 'static,
{
let (cert_reloader_service, cert_reloader_rx) = ReloaderService::<
service::CryptoReloader<T>,
service::ServerCryptoBase,
>::new(
app_manager, CERTS_WATCH_DELAY_SECS, !LOAD_CERTS_ONLY_WHEN_UPDATED
)
.await?;
Ok((cert_reloader_service, cert_reloader_rx))
}

View file

@ -1,251 +0,0 @@
use super::certs::{CertsAndKeys, CryptoSource};
use crate::{backend::BackendAppManager, log::*, name_exp::ServerName};
use async_trait::async_trait;
use hot_reload::*;
use rustc_hash::FxHashMap as HashMap;
use rustls::{server::ResolvesServerCertUsingSni, sign::CertifiedKey, RootCertStore, ServerConfig};
use std::sync::Arc;
#[derive(Clone)]
/// Reloader service for certificates and keys for TLS
pub struct CryptoReloader<T>
where
T: CryptoSource,
{
inner: Arc<BackendAppManager<T>>,
}
/// SNI to ServerConfig map type
pub type SniServerCryptoMap = HashMap<ServerName, Arc<ServerConfig>>;
/// SNI to ServerConfig map
pub struct ServerCrypto {
// For Quic/HTTP3, only servers with no client authentication
#[cfg(feature = "http3-quinn")]
pub inner_global_no_client_auth: Arc<ServerConfig>,
#[cfg(all(feature = "http3-s2n", not(feature = "http3-quinn")))]
pub inner_global_no_client_auth: s2n_quic_rustls::Server,
// For TLS over TCP/HTTP2 and 1.1, map of SNI to server_crypto for all given servers
pub inner_local_map: Arc<SniServerCryptoMap>,
}
/// Reloader target for the certificate reloader service
#[derive(Debug, PartialEq, Eq, Clone, Default)]
pub struct ServerCryptoBase {
inner: HashMap<ServerName, CertsAndKeys>,
}
#[async_trait]
impl<T> Reload<ServerCryptoBase> for CryptoReloader<T>
where
T: CryptoSource + Sync + Send,
{
type Source = Arc<BackendAppManager<T>>;
async fn new(source: &Self::Source) -> Result<Self, ReloaderError<ServerCryptoBase>> {
Ok(Self { inner: source.clone() })
}
async fn reload(&self) -> Result<Option<ServerCryptoBase>, ReloaderError<ServerCryptoBase>> {
let mut certs_and_keys_map = ServerCryptoBase::default();
for (server_name_bytes_exp, backend) in self.inner.apps.iter() {
if let Some(crypto_source) = &backend.crypto_source {
let certs_and_keys = crypto_source
.read()
.await
.map_err(|_e| ReloaderError::<ServerCryptoBase>::Reload("Failed to reload cert, key or ca cert"))?;
certs_and_keys_map
.inner
.insert(server_name_bytes_exp.to_owned(), certs_and_keys);
}
}
Ok(Some(certs_and_keys_map))
}
}
impl TryInto<Arc<ServerCrypto>> for &ServerCryptoBase {
type Error = anyhow::Error;
fn try_into(self) -> Result<Arc<ServerCrypto>, Self::Error> {
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
let server_crypto_global = self.build_server_crypto_global()?;
let server_crypto_local_map: SniServerCryptoMap = self.build_server_crypto_local_map()?;
Ok(Arc::new(ServerCrypto {
#[cfg(feature = "http3-quinn")]
inner_global_no_client_auth: Arc::new(server_crypto_global),
#[cfg(all(feature = "http3-s2n", not(feature = "http3-quinn")))]
inner_global_no_client_auth: server_crypto_global,
inner_local_map: Arc::new(server_crypto_local_map),
}))
}
}
impl ServerCryptoBase {
fn build_server_crypto_local_map(&self) -> Result<SniServerCryptoMap, ReloaderError<ServerCryptoBase>> {
let mut server_crypto_local_map: SniServerCryptoMap = HashMap::default();
for (server_name_bytes_exp, certs_and_keys) in self.inner.iter() {
let server_name: String = server_name_bytes_exp.try_into()?;
// Parse server certificates and private keys
let Ok(certified_key): Result<CertifiedKey, _> = certs_and_keys.parse_server_certs_and_keys() else {
warn!("Failed to add certificate for {}", server_name);
continue;
};
let mut resolver_local = ResolvesServerCertUsingSni::new();
let mut client_ca_roots_local = RootCertStore::empty();
// add server certificate and key
if let Err(e) = resolver_local.add(server_name.as_str(), certified_key.to_owned()) {
error!("{}: Failed to read some certificates and keys {}", server_name.as_str(), e)
}
// add client certificate if specified
if certs_and_keys.client_ca_certs.is_some() {
// add client certificate if specified
match certs_and_keys.parse_client_ca_certs() {
Ok((owned_trust_anchors, _subject_key_ids)) => {
client_ca_roots_local.add_trust_anchors(owned_trust_anchors.into_iter());
}
Err(e) => {
warn!("Failed to add client CA certificate for {}: {}", server_name.as_str(), e);
}
}
}
let mut server_config_local = if client_ca_roots_local.is_empty() {
// with no client auth, enable http1.1 -- 3
#[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))]
{
ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(Arc::new(resolver_local))
}
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
{
let mut sc = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(Arc::new(resolver_local));
sc.alpn_protocols = vec![b"h3".to_vec(), b"hq-29".to_vec()]; // TODO: remove hq-29 later?
sc
}
} else {
// with client auth, enable only http1.1 and 2
// let client_certs_verifier = rustls::server::AllowAnyAnonymousOrAuthenticatedClient::new(client_ca_roots);
let client_certs_verifier = rustls::server::AllowAnyAuthenticatedClient::new(client_ca_roots_local);
ServerConfig::builder()
.with_safe_defaults()
.with_client_cert_verifier(Arc::new(client_certs_verifier))
.with_cert_resolver(Arc::new(resolver_local))
};
server_config_local.alpn_protocols.push(b"h2".to_vec());
server_config_local.alpn_protocols.push(b"http/1.1".to_vec());
server_crypto_local_map.insert(server_name_bytes_exp.to_owned(), Arc::new(server_config_local));
}
Ok(server_crypto_local_map)
}
#[cfg(feature = "http3-quinn")]
fn build_server_crypto_global(&self) -> Result<ServerConfig, ReloaderError<ServerCryptoBase>> {
let mut resolver_global = ResolvesServerCertUsingSni::new();
for (server_name_bytes_exp, certs_and_keys) in self.inner.iter() {
let server_name: String = server_name_bytes_exp.try_into()?;
// Parse server certificates and private keys
let Ok(certified_key): Result<CertifiedKey, _> = certs_and_keys.parse_server_certs_and_keys() else {
warn!("Failed to add certificate for {}", server_name);
continue;
};
if certs_and_keys.client_ca_certs.is_none() {
// aggregated server config for no client auth server for http3
if let Err(e) = resolver_global.add(server_name.as_str(), certified_key) {
error!("{}: Failed to read some certificates and keys {}", server_name.as_str(), e)
}
}
}
//////////////
let mut server_crypto_global = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(Arc::new(resolver_global));
//////////////////////////////
server_crypto_global.alpn_protocols = vec![
b"h3".to_vec(),
b"hq-29".to_vec(), // TODO: remove later?
b"h2".to_vec(),
b"http/1.1".to_vec(),
];
Ok(server_crypto_global)
}
#[cfg(all(feature = "http3-s2n", not(feature = "http3-quinn")))]
fn build_server_crypto_global(&self) -> Result<s2n_quic_rustls::Server, ReloaderError<ServerCryptoBase>> {
let mut resolver_global = s2n_quic_rustls::rustls::server::ResolvesServerCertUsingSni::new();
for (server_name_bytes_exp, certs_and_keys) in self.inner.iter() {
let server_name: String = server_name_bytes_exp.try_into()?;
// Parse server certificates and private keys
let Ok(certified_key) = parse_server_certs_and_keys_s2n(certs_and_keys) else {
warn!("Failed to add certificate for {}", server_name);
continue;
};
if certs_and_keys.client_ca_certs.is_none() {
// aggregated server config for no client auth server for http3
if let Err(e) = resolver_global.add(server_name.as_str(), certified_key) {
error!("{}: Failed to read some certificates and keys {}", server_name.as_str(), e)
}
}
}
let alpn = [
b"h3".to_vec(),
b"hq-29".to_vec(), // TODO: remove later?
b"h2".to_vec(),
b"http/1.1".to_vec(),
];
let server_crypto_global = s2n_quic::provider::tls::rustls::Server::builder()
.with_cert_resolver(Arc::new(resolver_global))
.map_err(|e| anyhow::anyhow!(e))?
.with_application_protocols(alpn.iter())
.map_err(|e| anyhow::anyhow!(e))?
.build()
.map_err(|e| anyhow::anyhow!(e))?;
Ok(server_crypto_global)
}
}
#[cfg(all(feature = "http3-s2n", not(feature = "http3-quinn")))]
/// This is workaround for the version difference between rustls and s2n-quic-rustls
fn parse_server_certs_and_keys_s2n(
certs_and_keys: &CertsAndKeys,
) -> Result<s2n_quic_rustls::rustls::sign::CertifiedKey, anyhow::Error> {
let signing_key = certs_and_keys
.cert_keys
.iter()
.find_map(|k| {
let s2n_private_key = s2n_quic_rustls::PrivateKey(k.0.clone());
if let Ok(sk) = s2n_quic_rustls::rustls::sign::any_supported_type(&s2n_private_key) {
Some(sk)
} else {
None
}
})
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidInput, "Unable to find a valid certificate and key"))?;
let certs: Vec<_> = certs_and_keys
.certs
.iter()
.map(|c| s2n_quic_rustls::rustls::Certificate(c.0.clone()))
.collect();
Ok(s2n_quic_rustls::rustls::sign::CertifiedKey::new(certs, signing_key))
}

View file

@ -20,10 +20,10 @@ pub enum RpxyError {
NoDefaultCryptoProvider,
#[error("Failed to build server config: {0}")]
FailedToBuildServerConfig(String),
// #[error("Failed to update server crypto: {0}")]
// FailedToUpdateServerCrypto(String),
// #[error("No server crypto: {0}")]
// NoServerCrypto(String),
#[error("Failed to update server crypto: {0}")]
FailedToUpdateServerCrypto(String),
#[error("No server crypto: {0}")]
NoServerCrypto(String),
// hyper errors
#[error("hyper body manipulation error: {0}")]
@ -63,8 +63,8 @@ pub enum RpxyError {
// certificate reloader errors
#[error("No certificate reloader when building a proxy for TLS")]
NoCertificateReloader,
// #[error("Certificate reload error: {0}")]
// CertificateReloadError(#[from] hot_reload::ReloaderError<crate::crypto::ServerCryptoBase>),
#[error("Certificate reload error: {0}")]
CertificateReloadError(#[from] hot_reload::ReloaderError<rpxy_certs::ServerCryptoBase>),
// backend errors
#[error("Invalid reverse proxy setting")]

View file

@ -1,21 +1,15 @@
use super::proxy_main::Proxy;
use crate::{
crypto::CryptoSource,
crypto::{ServerCrypto, ServerCryptoBase},
error::*,
log::*,
name_exp::ByteName,
};
use crate::{error::*, log::*, name_exp::ByteName};
use anyhow::anyhow;
use hot_reload::ReloaderReceiver;
use hyper_util::client::legacy::connect::Connect;
use rpxy_certs::{ServerCrypto, ServerCryptoBase};
use s2n_quic::provider;
use std::sync::Arc;
impl<U, T> Proxy<U, T>
impl<T> Proxy<T>
where
T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone + Sync + Send + 'static,
{
/// Start UDP proxy serving with HTTP/3 request for configured host names
pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> {
@ -25,7 +19,7 @@ where
info!("Start UDP proxy serving with HTTP/3 request for configured host names [s2n-quic]");
// initially wait for receipt
let mut server_crypto: Option<Arc<ServerCrypto>> = {
let mut server_crypto: Option<s2n_quic_rustls::Server> = {
let _ = server_crypto_rx.changed().await;
let sc = self.receive_server_crypto(server_crypto_rx.clone())?;
Some(sc)
@ -57,16 +51,24 @@ where
}
/// Receive server crypto from reloader
fn receive_server_crypto(
&self,
server_crypto_rx: ReloaderReceiver<ServerCryptoBase>,
) -> RpxyResult<Arc<ServerCrypto>> {
fn receive_server_crypto(&self, server_crypto_rx: ReloaderReceiver<ServerCryptoBase>) -> RpxyResult<s2n_quic_rustls::Server> {
let cert_keys_map = server_crypto_rx.borrow().clone().ok_or_else(|| {
error!("Reloader is broken");
RpxyError::CertificateReloadError(anyhow!("Reloader is broken").into())
})?;
let server_crypto: Option<Arc<ServerCrypto>> = (&cert_keys_map).try_into().ok();
let server_crypto: Option<s2n_quic_rustls::Server> = (&cert_keys_map).try_into().ok().and_then(|v: Arc<ServerCrypto>| {
let rustls_server_config = v.aggregated_config_no_client_auth.clone();
let resolver = rustls_server_config.cert_resolver.clone();
let alpn = rustls_server_config.alpn_protocols.clone();
#[allow(deprecated)]
let tls = provider::tls::rustls::server::Builder::default()
.with_cert_resolver(resolver)
.and_then(|t| t.with_application_protocols(alpn.iter()))
.and_then(|t| t.build())
.ok();
tls
});
server_crypto.ok_or_else(|| {
error!("Failed to update server crypto for h3 [s2n-quic]");
RpxyError::FailedToUpdateServerCrypto("Failed to update server crypto for h3 [s2n-quic]".to_string())
@ -74,7 +76,7 @@ where
}
/// Event loop for UDP proxy serving with HTTP/3 request for configured host names
async fn h3_listener_service_inner(&self, server_crypto: &Option<Arc<ServerCrypto>>) -> RpxyResult<()> {
async fn h3_listener_service_inner(&self, server_crypto: &Option<s2n_quic_rustls::Server>) -> RpxyResult<()> {
// setup UDP socket
let io = provider::io::tokio::Builder::default()
.with_receive_address(self.listening_on)?
@ -97,14 +99,11 @@ where
// setup tls
let Some(server_crypto) = server_crypto else {
warn!("No server crypto is given [s2n-quic]");
return Err(RpxyError::NoServerCrypto(
"No server crypto is given [s2n-quic]".to_string(),
));
return Err(RpxyError::NoServerCrypto("No server crypto is given [s2n-quic]".to_string()));
};
let tls = server_crypto.inner_global_no_client_auth.clone();
let mut server = s2n_quic::Server::builder()
.with_tls(tls)?
.with_tls(server_crypto.to_owned())?
.with_io(io)?
.with_limits(limits)?
.start()?;

1
submodules/s2n-quic Submodule

@ -0,0 +1 @@
Subproject commit d03cc470fa9812d06d204e312e4ada00079e96df

View file

@ -1,18 +0,0 @@
[package]
name = "s2n-quic-h3"
# this in an unpublished internal crate so the version should not be changed
version = "0.1.0"
authors = ["AWS s2n"]
edition = "2021"
rust-version = "1.63"
license = "Apache-2.0"
# this contains an http3 implementation for testing purposes and should not be published
publish = false
[dependencies]
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false }
# h3 = { path = "../h3/h3/" }
h3 = { version = "0.0.4" }
s2n-quic = "1.37.0"
s2n-quic-core = "0.37.0"

View file

@ -1,10 +0,0 @@
# s2n-quic-h3
This is an internal crate used by [s2n-quic](https://github.com/aws/s2n-quic) written as a proof of concept for implementing HTTP3 on top of s2n-quic. The API is not currently stable and should not be used directly.
## License
This project is licensed under the [Apache-2.0 License][license-url].
[license-badge]: https://img.shields.io/badge/license-apache-blue.svg
[license-url]: https://aws.amazon.com/apache-2-0/

View file

@ -1,7 +0,0 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
mod s2n_quic;
pub use self::s2n_quic::*;
pub use h3;

View file

@ -1,506 +0,0 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use bytes::{Buf, Bytes};
use futures::ready;
use h3::quic::{self, Error, StreamId, WriteBuf};
use s2n_quic::stream::{BidirectionalStream, ReceiveStream};
use s2n_quic_core::varint::VarInt;
use std::{
convert::TryInto,
fmt::{self, Display},
sync::Arc,
task::{self, Poll},
};
pub struct Connection {
conn: s2n_quic::connection::Handle,
bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor,
recv_acceptor: s2n_quic::connection::ReceiveStreamAcceptor,
}
impl Connection {
pub fn new(new_conn: s2n_quic::Connection) -> Self {
let (handle, acceptor) = new_conn.split();
let (bidi, recv) = acceptor.split();
Self {
conn: handle,
bidi_acceptor: bidi,
recv_acceptor: recv,
}
}
}
#[derive(Debug)]
pub struct ConnectionError(s2n_quic::connection::Error);
impl std::error::Error for ConnectionError {}
impl fmt::Display for ConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl Error for ConnectionError {
fn is_timeout(&self) -> bool {
matches!(self.0, s2n_quic::connection::Error::IdleTimerExpired { .. })
}
fn err_code(&self) -> Option<u64> {
match self.0 {
s2n_quic::connection::Error::Application { error, .. } => Some(error.into()),
_ => None,
}
}
}
impl From<s2n_quic::connection::Error> for ConnectionError {
fn from(e: s2n_quic::connection::Error) -> Self {
Self(e)
}
}
impl<B> quic::Connection<B> for Connection
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type OpenStreams = OpenStreams;
type Error = ConnectionError;
fn poll_accept_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Self::Error>> {
let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? {
Some(x) => x,
None => return Poll::Ready(Ok(None)),
};
Poll::Ready(Ok(Some(Self::RecvStream::new(recv))))
}
fn poll_accept_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Self::Error>> {
let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? {
Some(x) => x.split(),
None => return Poll::Ready(Ok(None)),
};
Poll::Ready(Ok(Some(Self::BidiStream {
send: Self::SendStream::new(send),
recv: Self::RecvStream::new(recv),
})))
}
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}
fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
}
}
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close(
code.value()
.try_into()
.expect("s2n-quic supports error codes up to 2^62-1"),
);
}
}
pub struct OpenStreams {
conn: s2n_quic::connection::Handle,
}
impl<B> quic::OpenStreams<B> for OpenStreams
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type Error = ConnectionError;
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close(
code.value()
.try_into()
.unwrap_or_else(|_| VarInt::MAX.into()),
);
}
}
impl Clone for OpenStreams {
fn clone(&self) -> Self {
Self {
conn: self.conn.clone(),
}
}
}
pub struct BidiStream<B>
where
B: Buf,
{
send: SendStream<B>,
recv: RecvStream,
}
impl<B> quic::BidiStream<B> for BidiStream<B>
where
B: Buf,
{
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
fn split(self) -> (Self::SendStream, Self::RecvStream) {
(self.send, self.recv)
}
}
impl<B> quic::RecvStream for BidiStream<B>
where
B: Buf,
{
type Buf = Bytes;
type Error = ReadError;
fn poll_data(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
self.recv.poll_data(cx)
}
fn stop_sending(&mut self, error_code: u64) {
self.recv.stop_sending(error_code)
}
fn recv_id(&self) -> StreamId {
self.recv.stream.id().try_into().expect("invalid stream id")
}
}
impl<B> quic::SendStream<B> for BidiStream<B>
where
B: Buf,
{
type Error = SendStreamError;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.send.poll_ready(cx)
}
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
self.send.poll_finish(cx)
}
fn reset(&mut self, reset_code: u64) {
self.send.reset(reset_code)
}
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
self.send.send_data(data)
}
fn send_id(&self) -> StreamId {
self.send.stream.id().try_into().expect("invalid stream id")
}
}
impl<B> From<BidirectionalStream> for BidiStream<B>
where
B: Buf,
{
fn from(bidi: BidirectionalStream) -> Self {
let (recv, send) = bidi.split();
BidiStream {
send: send.into(),
recv: recv.into(),
}
}
}
pub struct RecvStream {
stream: s2n_quic::stream::ReceiveStream,
}
impl RecvStream {
fn new(stream: s2n_quic::stream::ReceiveStream) -> Self {
Self { stream }
}
}
impl quic::RecvStream for RecvStream {
type Buf = Bytes;
type Error = ReadError;
fn poll_data(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
let buf = ready!(self.stream.poll_receive(cx))?;
Ok(buf).into()
}
fn stop_sending(&mut self, error_code: u64) {
let _ = self.stream.stop_sending(
s2n_quic::application::Error::new(error_code)
.expect("s2n-quic supports error codes up to 2^62-1"),
);
}
fn recv_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id")
}
}
impl From<ReceiveStream> for RecvStream {
fn from(recv: ReceiveStream) -> Self {
RecvStream::new(recv)
}
}
#[derive(Debug)]
pub struct ReadError(s2n_quic::stream::Error);
impl std::error::Error for ReadError {}
impl fmt::Display for ReadError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl From<ReadError> for Arc<dyn Error> {
fn from(e: ReadError) -> Self {
Arc::new(e)
}
}
impl From<s2n_quic::stream::Error> for ReadError {
fn from(e: s2n_quic::stream::Error) -> Self {
Self(e)
}
}
impl Error for ReadError {
fn is_timeout(&self) -> bool {
matches!(
self.0,
s2n_quic::stream::Error::ConnectionError {
error: s2n_quic::connection::Error::IdleTimerExpired { .. },
..
}
)
}
fn err_code(&self) -> Option<u64> {
match self.0 {
s2n_quic::stream::Error::ConnectionError {
error: s2n_quic::connection::Error::Application { error, .. },
..
} => Some(error.into()),
s2n_quic::stream::Error::StreamReset { error, .. } => Some(error.into()),
_ => None,
}
}
}
pub struct SendStream<B: Buf> {
stream: s2n_quic::stream::SendStream,
chunk: Option<Bytes>,
buf: Option<WriteBuf<B>>, // TODO: Replace with buf: PhantomData<B>
// after https://github.com/hyperium/h3/issues/78 is resolved
}
impl<B> SendStream<B>
where
B: Buf,
{
fn new(stream: s2n_quic::stream::SendStream) -> SendStream<B> {
Self {
stream,
chunk: None,
buf: Default::default(),
}
}
}
impl<B> quic::SendStream<B> for SendStream<B>
where
B: Buf,
{
type Error = SendStreamError;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
// try to flush the current chunk if we have one
if let Some(chunk) = self.chunk.as_mut() {
ready!(self.stream.poll_send(chunk, cx))?;
// s2n-quic will take the whole chunk on send, even if it exceeds the limits
debug_assert!(chunk.is_empty());
self.chunk = None;
}
// try to take the next chunk from the WriteBuf
if let Some(ref mut data) = self.buf {
let len = data.chunk().len();
// if the write buf is empty, then clear it and break
if len == 0 {
self.buf = None;
break;
}
// copy the first chunk from WriteBuf and prepare it to flush
let chunk = data.copy_to_bytes(len);
self.chunk = Some(chunk);
// loop back around to flush the chunk
continue;
}
// if we didn't have either a chunk or WriteBuf, then we're ready
break;
}
Poll::Ready(Ok(()))
// TODO: Replace with following after https://github.com/hyperium/h3/issues/78 is resolved
// self.available_bytes = ready!(self.stream.poll_send_ready(cx))?;
// Poll::Ready(Ok(()))
}
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
if self.buf.is_some() {
return Err(Self::Error::NotReady);
}
self.buf = Some(data.into());
Ok(())
// TODO: Replace with following after https://github.com/hyperium/h3/issues/78 is resolved
// let mut data = data.into();
// while self.available_bytes > 0 && data.has_remaining() {
// let len = data.chunk().len();
// let chunk = data.copy_to_bytes(len);
// self.stream.send_data(chunk)?;
// self.available_bytes = self.available_bytes.saturating_sub(len);
// }
// Ok(())
}
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// ensure all chunks are flushed to the QUIC stream before finishing
ready!(self.poll_ready(cx))?;
self.stream.finish()?;
Ok(()).into()
}
fn reset(&mut self, reset_code: u64) {
let _ = self
.stream
.reset(reset_code.try_into().unwrap_or_else(|_| VarInt::MAX.into()));
}
fn send_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id")
}
}
impl<B> From<s2n_quic::stream::SendStream> for SendStream<B>
where
B: Buf,
{
fn from(send: s2n_quic::stream::SendStream) -> Self {
SendStream::new(send)
}
}
#[derive(Debug)]
pub enum SendStreamError {
Write(s2n_quic::stream::Error),
NotReady,
}
impl std::error::Error for SendStreamError {}
impl Display for SendStreamError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
impl From<s2n_quic::stream::Error> for SendStreamError {
fn from(e: s2n_quic::stream::Error) -> Self {
Self::Write(e)
}
}
impl Error for SendStreamError {
fn is_timeout(&self) -> bool {
matches!(
self,
Self::Write(s2n_quic::stream::Error::ConnectionError {
error: s2n_quic::connection::Error::IdleTimerExpired { .. },
..
})
)
}
fn err_code(&self) -> Option<u64> {
match self {
Self::Write(s2n_quic::stream::Error::StreamReset { error, .. }) => {
Some((*error).into())
}
Self::Write(s2n_quic::stream::Error::ConnectionError {
error: s2n_quic::connection::Error::Application { error, .. },
..
}) => Some((*error).into()),
_ => None,
}
}
}
impl From<SendStreamError> for Arc<dyn Error> {
fn from(e: SendStreamError) -> Self {
Arc::new(e)
}
}