190 lines
5.8 KiB
Python
190 lines
5.8 KiB
Python
# Get certificates from domains and make a similar chain.
|
|
|
|
import os
|
|
import OpenSSL
|
|
import ssl
|
|
#import asn1
|
|
|
|
CERTS_DIR = "/dev/shm/exp/certs/"
|
|
ALGS = ["prime256v1", "secp384r1", "rsa2048", "rsa3072", "rsa4096"]
|
|
DOMAINS = [
|
|
#"txmn.tk",
|
|
#"wikipedia.org",
|
|
"youtube.com"
|
|
]
|
|
|
|
def sh(cmds):
|
|
if type(cmds) == list:
|
|
for cmd in cmds:
|
|
print(cmd)
|
|
os.system(cmd)
|
|
elif type(cmds) == str:
|
|
print(cmds)
|
|
os.system(cmds)
|
|
else:
|
|
raise TypeError
|
|
|
|
def make_sk(outpath, alg):
|
|
sh({
|
|
"ed25519": [
|
|
f"openssl genpkey -out {outpath}.sec1 -algorithm ed25519",
|
|
f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM"
|
|
],
|
|
"prime256v1": [
|
|
f"openssl ecparam -genkey -name prime256v1 -noout -out {outpath}.sec1",
|
|
f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM"
|
|
],
|
|
"rsa2048": [
|
|
f"openssl genrsa -out {outpath} 2048"
|
|
],
|
|
"rsa3072": [
|
|
f"openssl genrsa -out {outpath} 3072"
|
|
],
|
|
"rsa4096": [
|
|
f"openssl genrsa -out {outpath} 4096"
|
|
],
|
|
"secp384r1": [
|
|
f"openssl ecparam -genkey -name secp384r1 -noout -out {outpath}.sec1",
|
|
f"openssl pkcs8 -topk8 -nocrypt -in {outpath}.sec1 -out {outpath} -outform PEM"
|
|
],
|
|
}[alg])
|
|
|
|
# Connect to a server and return its certificate
|
|
def get_server_cert(domain, port=443):
|
|
cert_pem = ssl.get_server_certificate((domain, port))
|
|
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
|
|
for i in range(cert.get_extension_count()):
|
|
ext = cert.get_extension(i)
|
|
ext_name = ext.get_short_name()
|
|
#if ext_name == "":
|
|
return cert
|
|
|
|
# Replace CA and key of a certificate while keeping everything else the same
|
|
def replace_keys(cert, key, cas):
|
|
# Choose CA with the same signature algorithm
|
|
ca_cert, ca_key = cas[(key.type(), key.bits())]
|
|
|
|
# Get Key id
|
|
ca_key_id = None
|
|
for i in range(ca_cert.get_extension_count()):
|
|
ext = cert.get_extension(i)
|
|
ext_name = ext.get_short_name()
|
|
if ext_name == b"subjectKeyIdentifier":
|
|
ca_key_id = ext.__str__()
|
|
|
|
# Create new certificate
|
|
cert2 = OpenSSL.crypto.X509()
|
|
|
|
# Set fields
|
|
cert2.set_version(cert.get_version())
|
|
cert2.set_issuer(ca_cert.get_subject())
|
|
cert2.set_serial_number(cert.get_serial_number())
|
|
cert2.set_notAfter(cert.get_notAfter())
|
|
cert2.set_notBefore(cert.get_notBefore())
|
|
cert2.set_subject(cert.get_subject())
|
|
cert2.set_pubkey(key)
|
|
|
|
# Set extensions
|
|
exts = []
|
|
for i in range(cert.get_extension_count()):
|
|
# They had the good idea to have different input and output formats.
|
|
# Output is ASN.1 but input is text.
|
|
# Text output from __str__ is NOT the same encoding as the expected text input.
|
|
# Because why not.
|
|
# Input format doc is here: https://docs.openssl.org/3.0/man5/x509v3_config
|
|
ext = cert.get_extension(i)
|
|
ext_name = ext.get_short_name()
|
|
ext_data = ext.get_data()
|
|
ext_str = ext.__str__()
|
|
#dec = asn1.Decoder()
|
|
#dec.start(ext_data)
|
|
#tag, val = dec.read()
|
|
#print(ext_name, ":", ext_data, "::", ext_str)
|
|
if ext_name == b"authorityKeyIdentifier":
|
|
ext_data = b"keyid:always"
|
|
#elif ext_name == b"keyUsage":
|
|
# ext_data = b"digitalSignature"
|
|
#elif ext_name == b"extendedKeyUsage":
|
|
# ext_data = b""
|
|
# for j in ext_str.split(", "):
|
|
# if len(ext_data) > 0:
|
|
# ext_data += b","
|
|
# ext_data += {
|
|
# "TLS Web Server Authentication": b"serverAuth",
|
|
# "TLS Web Client Authentication": b"clientAuth",
|
|
# }[j]
|
|
#elif ext_name == b"basicConstraints":
|
|
# ext_data = ext_str.encode()
|
|
#elif ext_name == b"subjectKeyIdentifier":
|
|
# ext_data = ext_str.encode()
|
|
#elif ext_name == b"authorityInfoAccess":
|
|
# ext_data = ext_str.replace(" - ", ";").replace("CA Issuers", "caIssuers").encode()
|
|
#elif ext_name == b"subjectAltName":
|
|
# ext_data = ext_str.encode()
|
|
#elif ext_name == b"certificatePolicies":
|
|
# #ext_data = ext_str.replace("Policy: ", "").encode()
|
|
# continue # weird error
|
|
#elif ext_name == b"crlDistributionPoints":
|
|
# print(ext_str.encode())
|
|
# ext_data = ext_str.replace("Full Name:\n ", "").replace("\n", "").encode()
|
|
else:
|
|
ext_data = b"DER:"+ext_data.hex().encode()
|
|
exts.append(OpenSSL.crypto.X509Extension(ext_name, ext.get_critical()==1, ext_data, issuer=ca_cert))
|
|
cert2.add_extensions(exts)
|
|
|
|
# Sign
|
|
digest = None
|
|
sig_alg = cert.get_signature_algorithm()
|
|
if b"SHA384" in sig_alg or b"sha384" in sig_alg:
|
|
digest = "sha384"
|
|
elif b"SHA256" in sig_alg or b"sha256" in sig_alg:
|
|
digest = "sha256"
|
|
elif b"SHA512" in sig_alg or b"sha512" in sig_alg:
|
|
digest = "sha512"
|
|
if digest == None:
|
|
print("Unknown signature algorithm:", sig_alg)
|
|
raise Exception
|
|
cert2.sign(ca_key, digest)
|
|
|
|
return cert2
|
|
|
|
# Save certificate to a PEM file
|
|
def save_cert(cert, out_path):
|
|
f = open(out_path, "wb")
|
|
f.write(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
|
|
f.close()
|
|
|
|
# Fetch CA certs and keys and map them by algorithm
|
|
def fetch_cas():
|
|
cas = {}
|
|
for alg in ALGS:
|
|
f = open(f"{CERTS_DIR}{alg}/ca.crt", "rb")
|
|
ca_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, f.read())
|
|
f.close()
|
|
f = open(f"{CERTS_DIR}{alg}/ca.key", "rb")
|
|
ca_key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read())
|
|
f.close()
|
|
cas[(ca_key.type(), ca_key.bits())] = (ca_cert, ca_key)
|
|
return cas
|
|
|
|
if __name__ == "__main__":
|
|
os.makedirs(f"{CERTS_DIR}realistic", exist_ok=True)
|
|
cas = fetch_cas()
|
|
for domain in DOMAINS:
|
|
cert = get_server_cert(domain)
|
|
pk = cert.get_pubkey()
|
|
pk_alg = (pk.type(), pk.bits())
|
|
alg = {
|
|
(OpenSSL.crypto.TYPE_EC, 256): "prime256v1",
|
|
(OpenSSL.crypto.TYPE_EC, 384): "secp384r1",
|
|
(OpenSSL.crypto.TYPE_RSA, 2048): "rsa2048",
|
|
(OpenSSL.crypto.TYPE_RSA, 3072): "rsa3072",
|
|
(OpenSSL.crypto.TYPE_RSA, 4096): "rsa4096",
|
|
}[pk_alg]
|
|
key2_path = f"{CERTS_DIR}realistic/{domain}.key"
|
|
make_sk(key2_path, alg)
|
|
f = open(key2_path, "rb")
|
|
key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, f.read())
|
|
f.close()
|
|
cert2 = replace_keys(cert, key2, cas)
|
|
save_cert(cert2, f"{CERTS_DIR}realistic/{domain}.crt")
|