# Get certificates from domains and make a similar chain. import os import OpenSSL import ssl #import asn1 CERTS_DIR = "/dev/shm/exp/certs/" ALGS = ["prime256v1", "secp384r1", "rsa2048", "rsa3072", "rsa4096"] DOMAINS = [ #"txmn.tk", #"wikipedia.org", "youtube.com" ] def sh(cmds): if type(cmds) == list: for cmd in cmds: print(cmd) os.system(cmd) elif type(cmds) == str: print(cmds) os.system(cmds) else: raise TypeError def make_sk(outpath, alg): sh({ "ed25519": [ f"openssl genpkey -out {outpath}.sec1 -algorithm ed25519", f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM" ], "prime256v1": [ f"openssl ecparam -genkey -name prime256v1 -noout -out {outpath}.sec1", f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM" ], "rsa2048": [ f"openssl genrsa -out {outpath} 2048" ], "rsa3072": [ f"openssl genrsa -out {outpath} 3072" ], "rsa4096": [ f"openssl genrsa -out {outpath} 4096" ], "secp384r1": [ f"openssl ecparam -genkey -name secp384r1 -noout -out {outpath}.sec1", f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM" ], }[alg]) # Connect to a server and return its certificate def get_server_cert(domain, port=443): cert_pem = ssl.get_server_certificate((domain, port)) cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem) for i in range(cert.get_extension_count()): ext = cert.get_extension(i) ext_name = ext.get_short_name() #if ext_name == "": return cert # Replace CA and key of a certificate while keeping everything else the same def replace_keys(cert, key, cas): # Choose CA with the same signature algorithm ca_cert, ca_key = cas[(key.type(), key.bits())] # Get Key id ca_key_id = None for i in range(ca_cert.get_extension_count()): ext = cert.get_extension(i) ext_name = ext.get_short_name() if ext_name == b"subjectKeyIdentifier": ca_key_id = ext.__str__() # Create new certificate cert2 = OpenSSL.crypto.X509() # Set fields cert2.set_version(cert.get_version()) cert2.set_issuer(ca_cert.get_subject()) cert2.set_serial_number(cert.get_serial_number()) cert2.set_notAfter(cert.get_notAfter()) cert2.set_notBefore(cert.get_notBefore()) cert2.set_subject(cert.get_subject()) cert2.set_pubkey(key) # Set extensions exts = [] for i in range(cert.get_extension_count()): # They had the good idea to have different input and output formats. # Output is ASN.1 but input is text. # Text output from __str__ is NOT the same encoding as the expected text input. # Because why not. # Input format doc is here: https://docs.openssl.org/3.0/man5/x509v3_config ext = cert.get_extension(i) ext_name = ext.get_short_name() ext_data = ext.get_data() ext_str = ext.__str__() #dec = asn1.Decoder() #dec.start(ext_data) #tag, val = dec.read() #print(ext_name, ":", ext_data, "::", ext_str) if ext_name == b"authorityKeyIdentifier": ext_data = b"keyid:always" #elif ext_name == b"keyUsage": # ext_data = b"digitalSignature" #elif ext_name == b"extendedKeyUsage": # ext_data = b"" # for j in ext_str.split(", "): # if len(ext_data) > 0: # ext_data += b"," # ext_data += { # "TLS Web Server Authentication": b"serverAuth", # "TLS Web Client Authentication": b"clientAuth", # }[j] #elif ext_name == b"basicConstraints": # ext_data = ext_str.encode() #elif ext_name == b"subjectKeyIdentifier": # ext_data = ext_str.encode() #elif ext_name == b"authorityInfoAccess": # ext_data = ext_str.replace(" - ", ";").replace("CA Issuers", "caIssuers").encode() #elif ext_name == b"subjectAltName": # ext_data = ext_str.encode() #elif ext_name == b"certificatePolicies": # #ext_data = ext_str.replace("Policy: ", "").encode() # continue # weird error #elif ext_name == b"crlDistributionPoints": # print(ext_str.encode()) # ext_data = ext_str.replace("Full Name:\n ", "").replace("\n", "").encode() else: ext_data = b"DER:"+ext_data.hex().encode() exts.append(OpenSSL.crypto.X509Extension(ext_name, ext.get_critical()==1, ext_data, issuer=ca_cert)) cert2.add_extensions(exts) # Sign digest = None sig_alg = cert.get_signature_algorithm() if b"SHA384" in sig_alg or b"sha384" in sig_alg: digest = "sha384" elif b"SHA256" in sig_alg or b"sha256" in sig_alg: digest = "sha256" elif b"SHA512" in sig_alg or b"sha512" in sig_alg: digest = "sha512" if digest == None: print("Unknown signature algorithm:", sig_alg) raise Exception cert2.sign(ca_key, digest) return cert2 # Save certificate to a PEM file def save_cert(cert, out_path): f = open(out_path, "wb") f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)) f.close() # Fetch CA certs and keys and map them by algorithm def fetch_cas(): cas = {} for alg in ALGS: f = open(f"{CERTS_DIR}{alg}/ca.crt", "rb") ca_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read()) f.close() f = open(f"{CERTS_DIR}{alg}/ca.key", "rb") ca_key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read()) f.close() cas[(ca_key.type(), ca_key.bits())] = (ca_cert, ca_key) return cas if __name__ == "__main__": os.makedirs(f"{CERTS_DIR}realistic", exist_ok=True) cas = fetch_cas() for domain in DOMAINS: cert = get_server_cert(domain) pk = cert.get_pubkey() pk_alg = (pk.type(), pk.bits()) alg = { (OpenSSL.crypto.TYPE_EC, 256): "prime256v1", (OpenSSL.crypto.TYPE_EC, 384): "secp384r1", (OpenSSL.crypto.TYPE_RSA, 2048): "rsa2048", (OpenSSL.crypto.TYPE_RSA, 3072): "rsa3072", (OpenSSL.crypto.TYPE_RSA, 4096): "rsa4096", }[pk_alg] key2_path = f"{CERTS_DIR}realistic/{domain}.key" make_sk(key2_path, alg) f = open(key2_path, "rb") key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read()) f.close() cert2 = replace_keys(cert, key2, cas) save_cert(cert2, f"{CERTS_DIR}realistic/{domain}.crt")