tlsbench/crawler.py

228 lines
5.9 KiB
Python

import json,re, subprocess, sys, threading, time
WRITE_INTERVAL = 1000
BATCH = 200
TIMEOUT = 4
CAPTURES = {
"read_bytes": "SSL handshake has read (\\d+) bytes and written \\d+ bytes\n",
"written_bytes": "SSL handshake has read \\d+ bytes and written (\\d+) bytes\n",
"cert_sig": "Peer signature type: ([a-zA-Z0-9_.-]+)\n",
"cert_pk_size": "Server public key is (\\d+) bit\n",
#"kx": "(?:Negotiated TLS1\\.3 group|Peer Temp Key): ([a-zA-Z0-9_.-]+)(?:\n|,)",
"kx": "(?:Negotiated TLS1\\.3 group|Peer Temp Key): ([a-zA-Z0-9_., -]+)\n",
"cipher": "Cipher is ([a-zA-Z0-9_.-]+)\n",
"protocol": "Protocol: ([a-zA-Z0-9_.-]+)\n",
}
lock = threading.Lock()
results = {}
subs = []
def probe(domain, ossl):
# End of input data, cleanly stops the session and the process
ossl.stdin.write(b"")
ossl.stdin.close()
lock.acquire()
results[domain] = output = ossl.stdout.read().decode()
lock.release()
ossl.kill()
if __name__ == "__main__":
if sys.argv[1] == "crawl":
threads = []
f = open(sys.argv[2], "r")
i = 0
for line in f.readlines()[1:]:
if i % WRITE_INTERVAL == 0:
f = open(f"/dev/shm/crawl-{i}.json", "w")
lock.acquire()
json.dump(results, f)
lock.release()
f.close()
i += 1
#line = line.removesuffix("\n")
line = line[:line.find(",")-1]
print("start", line)
addr = f"{line}:443"
ossl = subprocess.Popen(["openssl", "s_client", "-no-interactive", addr], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
t = threading.Thread(target=probe, args=(line, ossl,))
threads.append((t, ossl, time.time()))
time.sleep(0.02)
t.start()
if len(threads) >= BATCH:
print("wait")
for (t, ossl, start) in threads:
t.join(max(0, TIMEOUT - (time.time() - start)))
try:
ossl.kill()
except:
pass
threads.clear()
print("waited")
f.close()
for (t, ossl, start) in threads:
t.join(max(0, TIMEOUT - (time.time() - start)))
try:
ossl.kill()
except:
pass
threads.clear()
print("finished")
f = open("/dev/shm/crawl.json", "w")
json.dump(results, f)
f.close()
print("written")
sys.exit(0)
elif sys.argv[1] == "stat":
regexes = {}
for r in CAPTURES:
regexes[r] = re.compile(CAPTURES[r])
stats = {}
summary = {
"cert": {
"none": 0,
"secp256r1": 0,
"secp384r1": 0,
"secp521r1": 0,
"rsa512": 0,
"rsa1024": 0,
"rsa2048": 0,
"rsa3072": 0,
"rsa4096": 0,
},
"cipher": {
"none": 0,
"aes128": 0,
"aes256": 0,
"chacha20": 0,
},
"kx": {
"none": 0,
"x25519mlkem768": 0,
"x25519": 0,
"rsa2048": 0,
"rsa3072": 0,
"rsa4096": 0,
"secp256r1": 0,
"secp384r1": 0,
"secp521r1": 0,
},
"version": {
"none": 0,
"1.2": 0,
"1.3": 0,
}
}
combinations = {}
f = open(sys.argv[2], "r")
c = json.load(f)
for domain in c:
domain_stats = {}
for r in CAPTURES:
try:
val = regexes[r].finditer(c[domain]).__next__().group(1)
#print(r, val)
if r in ["domain", "read_bytes", "written_bytes"]:
continue
if not r in stats:
stats[r] = {}
if not val in stats[r]:
stats[r][val] = 1
else:
stats[r][val] += 1
domain_stats[r] = val
except StopIteration:
pass
#print("Not found:", line, r)
cert = ""
if "cert_sig" not in domain_stats:
cert = "none"
elif domain_stats["cert_sig"] == "ecdsa_secp256r1_sha256":
cert = "secp256r1"
elif domain_stats["cert_sig"] == "ecdsa_secp384r1_sha384":
cert = "secp384r1"
elif domain_stats["cert_sig"] == "ecdsa_secp521r1_sha512":
cert = "secp521r1"
elif "rsa" in domain_stats["cert_sig"]:
cert = "rsa{}".format(domain_stats["cert_pk_size"])
if cert != "":
summary["cert"][cert] += 1
cipher = ""
if "cipher" not in domain_stats:
cipher = "none"
elif "AES_128" in domain_stats["cipher"] or "AES128" in domain_stats["cipher"]:
cipher = "aes128"
elif "AES_256" in domain_stats["cipher"] or "AES256" in domain_stats["cipher"]:
cipher = "aes256"
elif "CHACHA20" in domain_stats["cipher"]:
cipher = "chacha20"
if cipher != "":
summary["cipher"][cipher] += 1
kx = ""
if "kx" not in domain_stats:
kx = "none"
elif domain_stats["kx"] == "X25519MLKEM768":
kx = "x25519mlkem768"
elif domain_stats["kx"] == "X25519, 253 bits":
kx = "x25519"
elif domain_stats["kx"] == "DH, 2048 bits":
kx = "rsa2048"
elif domain_stats["kx"] == "DH, 3072 bits":
kx = "rsa3072"
elif domain_stats["kx"] == "DH, 4096 bits":
kx = "rsa4096"
elif domain_stats["kx"] == "ECDH, prime256v1, 256 bits":
kx = "secp256r1"
elif domain_stats["kx"] == "ECDH, secp384r1, 384 bits":
kx = "secp384r1"
elif domain_stats["kx"] == "ECDH, secp521r1, 521 bits":
kx = "secp521r1"
if kx != "":
summary["kx"][kx] += 1
protocol = ""
if "protocol" not in domain_stats:
protocol = "none"
elif domain_stats["protocol"] == "TLSv1.3":
protocol = "1.3"
elif domain_stats["protocol"] == "TLSv1.2":
protocol = "1.2"
if protocol != "":
summary["version"][protocol] += 1
combination = (protocol, cert, kx, cipher)
if combination in combinations:
combinations[combination] += 1
else:
combinations[combination] = 1
#if "kx" in domain_stats and domain_stats["kx"] == "ECDH":
# print(c[domain])
# exit(0)
if "-t" in sys.argv: # text output
for cat in stats:
print(f"{cat}:")
for item in stats[cat]:
print(" {}:\t{}".format(item, stats[cat][item]))
if "-s" in sys.argv: # summary
for cat in summary:
print(f"{cat}:")
for item in summary[cat]:
print(" {}:\t{}".format(item, summary[cat][item]))
if "-c" in sys.argv: # combinations
combinations_list = [c for c in combinations]
combinations_list.sort(key=lambda c: combinations[c])
for c in combinations_list:
print("{}\t{}".format(c, combinations[c]))
else:
print(stats)