tlsbench/makecerts.py

190 lines
5.8 KiB
Python

# Get certificates from domains and make a similar chain.
import os
import OpenSSL
import ssl
#import asn1
CERTS_DIR = "/dev/shm/exp/certs/"
ALGS = ["prime256v1", "secp384r1", "rsa2048", "rsa3072", "rsa4096"]
DOMAINS = [
#"txmn.tk",
"wikipedia.org",
#"youtube.com"
]
def sh(cmds):
if type(cmds) == list:
for cmd in cmds:
print(cmd)
os.system(cmd)
elif type(cmds) == str:
print(cmds)
os.system(cmds)
else:
raise TypeError
def make_sk(outpath, alg):
sh({
"ed25519": [
f"openssl genpkey -out {outpath}.sec1 -algorithm ed25519",
f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM"
],
"prime256v1": [
f"openssl ecparam -genkey -name prime256v1 -noout -out {outpath}.sec1",
f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM"
],
"rsa2048": [
f"openssl genrsa -out {outpath} 2048"
],
"rsa3072": [
f"openssl genrsa -out {outpath} 3072"
],
"rsa4096": [
f"openssl genrsa -out {outpath} 4096"
],
"secp384r1": [
f"openssl ecparam -genkey -name secp384r1 -noout -out {outpath}.sec1",
f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM"
],
}[alg])
# Connect to a server and return its certificate
def get_server_cert(domain, port=443):
cert_pem = ssl.get_server_certificate((domain, port))
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
for i in range(cert.get_extension_count()):
ext = cert.get_extension(i)
ext_name = ext.get_short_name()
#if ext_name == "":
return cert
# Replace CA and key of a certificate while keeping everything else the same
def replace_keys(cert, key, cas):
# Choose CA with the same signature algorithm
ca_cert, ca_key = cas[(key.type(), key.bits())]
# Get Key id
ca_key_id = None
for i in range(ca_cert.get_extension_count()):
ext = cert.get_extension(i)
ext_name = ext.get_short_name()
if ext_name == b"subjectKeyIdentifier":
ca_key_id = ext.__str__()
# Create new certificate
cert2 = OpenSSL.crypto.X509()
# Set fields
cert2.set_version(cert.get_version())
cert2.set_issuer(ca_cert.get_subject())
cert2.set_serial_number(cert.get_serial_number())
cert2.set_notAfter(cert.get_notAfter())
cert2.set_notBefore(cert.get_notBefore())
cert2.set_subject(cert.get_subject())
cert2.set_pubkey(key)
# Set extensions
exts = []
for i in range(cert.get_extension_count()):
# They had the good idea to have different input and output formats.
# Output is ASN.1 but input is text.
# Text output from __str__ is NOT the same encoding as the expected text input.
# Because why not.
# Input format doc is here: https://docs.openssl.org/3.0/man5/x509v3_config
ext = cert.get_extension(i)
ext_name = ext.get_short_name()
ext_data = ext.get_data()
ext_str = ext.__str__()
#dec = asn1.Decoder()
#dec.start(ext_data)
#tag, val = dec.read()
#print(ext_name, ":", ext_data, "::", ext_str)
if ext_name == b"authorityKeyIdentifier":
ext_data = b"keyid:always"
#elif ext_name == b"keyUsage":
# ext_data = b"digitalSignature"
#elif ext_name == b"extendedKeyUsage":
# ext_data = b""
# for j in ext_str.split(", "):
# if len(ext_data) > 0:
# ext_data += b","
# ext_data += {
# "TLS Web Server Authentication": b"serverAuth",
# "TLS Web Client Authentication": b"clientAuth",
# }[j]
#elif ext_name == b"basicConstraints":
# ext_data = ext_str.encode()
#elif ext_name == b"subjectKeyIdentifier":
# ext_data = ext_str.encode()
#elif ext_name == b"authorityInfoAccess":
# ext_data = ext_str.replace(" - ", ";").replace("CA Issuers", "caIssuers").encode()
#elif ext_name == b"subjectAltName":
# ext_data = ext_str.encode()
#elif ext_name == b"certificatePolicies":
# #ext_data = ext_str.replace("Policy: ", "").encode()
# continue # weird error
#elif ext_name == b"crlDistributionPoints":
# print(ext_str.encode())
# ext_data = ext_str.replace("Full Name:\n ", "").replace("\n", "").encode()
else:
ext_data = b"DER:"+ext_data.hex().encode()
exts.append(OpenSSL.crypto.X509Extension(ext_name, ext.get_critical()==1, ext_data, issuer=ca_cert))
cert2.add_extensions(exts)
# Sign
digest = None
sig_alg = cert.get_signature_algorithm()
if b"SHA384" in sig_alg:
digest = "sha384"
elif b"SHA256" in sig_alg:
digest = "sha256"
elif b"SHA512" in sig_alg:
digest = "sha512"
if digest == None:
print("Unknown signature algorithm:", sig_alg)
raise Exception
cert2.sign(ca_key, digest)
return cert2
# Save certificate to a PEM file
def save_cert(cert, out_path):
f = open(out_path, "wb")
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
f.close()
# Fetch CA certs and keys and map them by algorithm
def fetch_cas():
cas = {}
for alg in ALGS:
f = open(f"{CERTS_DIR}{alg}/ca.crt", "rb")
ca_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read())
f.close()
f = open(f"{CERTS_DIR}{alg}/ca.key", "rb")
ca_key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read())
f.close()
cas[(ca_key.type(), ca_key.bits())] = (ca_cert, ca_key)
return cas
if __name__ == "__main__":
os.makedirs(f"{CERTS_DIR}realistic", exist_ok=True)
cas = fetch_cas()
for domain in DOMAINS:
cert = get_server_cert(domain)
pk = cert.get_pubkey()
pk_alg = (pk.type(), pk.bits())
alg = {
(OpenSSL.crypto.TYPE_EC, 256): "prime256v1",
(OpenSSL.crypto.TYPE_EC, 384): "secp384r1",
(OpenSSL.crypto.TYPE_RSA, 2048): "rsa2048",
(OpenSSL.crypto.TYPE_RSA, 3072): "rsa3072",
(OpenSSL.crypto.TYPE_RSA, 4096): "rsa4096",
}[pk_alg]
key2_path = f"{CERTS_DIR}realistic/{domain}.key"
make_sk(key2_path, alg)
f = open(key2_path, "rb")
key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read())
f.close()
cert2 = replace_keys(cert, key2, cas)
save_cert(cert2, f"{CERTS_DIR}realistic/{domain}.crt")