I've created a handy script, "ntpcert-sweep.py," which can display the complete 
TLS certificate chain from a certificate PEM file or an NTS server to a root 
certificate file on the host machine. The timestamps are probably GMT, the 
hexstrings are X509v3 Subject and Authority Key Identifiers. After including 
some feedback for enhancements, we should include it on the git tree.

```console
$ python ntpclients/ntpcert-sweep.py -H dell-2018.jamesb192.com >stdout 2>stderr
==> stderr <==
until 2024-08-23T00:30:10: 7fea94c7d207e5fe8a4fe044aae34aca63b5a091 -> 
142eb317b75856cbae500940e61faf9d8b14c2c6
until 2025-09-15T16:00:00: 142eb317b75856cbae500940e61faf9d8b14c2c6 -> 
79b459e67bb6e5e40173800888c81a58f6e99b6e
until 2035-06-04T11:04:38: 79b459e67bb6e5e40173800888c81a58f6e99b6e -> 
until 2035-06-04T11:04:38: 79b459e67bb6e5e40173800888c81a58f6e99b6e ->
==> stdout <==
Potential root CA /etc/ssl/certs/ISRG_Root_X1.pem
Potential root CA /etc/ssl/certs/4042bcee.0
```
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright the NTPsec project contributors
# SPDX-License-Identifier: BSD-2-Clause
"""Get expiry and root CA info for a given cert source."""
import argparse
from glob import glob
import os.path
import socket
import ssl
import sys
import urllib.request
from cryptography import x509

CERT_DIR = "/etc/ssl/certs/"


def hexify(b):
    """Return input bytes as a base16 encoded str."""
    return "".join("%02x" % i for i in b)


def get_next_cert(cert, id_old_issuer):
    """Print and advance through certs file

    _if_ their ID matches or we are new."""
    expires = cert.not_valid_after
    try:
        id_issuer = cert.extensions.get_extension_for_class(
            x509.AuthorityKeyIdentifier
        ).value.key_identifier
    except x509.extensions.ExtensionNotFound:
        id_issuer = b""
    try:
        id_subject = cert.extensions.get_extension_for_class(
            x509.SubjectKeyIdentifier
        ).value.digest
    except x509.extensions.ExtensionNotFound:
        id_subject = b""
    if id_old_issuer in [id_subject, None]:
        print(
            "until %04d-%02d-%02dT%02d:%02d:%02d: %s -> %s"
            % (
                expires.year,
                expires.month,
                expires.day,
                expires.hour,
                expires.minute,
                expires.second,
                hexify(id_subject),
                hexify(id_issuer),
            ),
            file=sys.stderr,
        )
        return (b"" == id_issuer, id_issuer)
    return (False, id_old_issuer)


def get_ca_file(id_root):
    """Find and name single cert files

    in _dir_ who are _id_root_."""
    ever_won = False
    for f in list(glob(CERT_DIR + "/**")):
        if os.path.isdir(f):
            # print("Skipping dir:", f, file=sys.stderr)
            continue
        with open(f, "rb") as fp:
            blob = fp.read()

        certs = x509.load_pem_x509_certificates(blob)
        # if 1 < len(certs):
        #     # print("Skipping len:", f, len(certs), file=sys.stderr)
        #     continue
        won = get_next_cert(certs[0], id_root)[0]
        if won:
            print("Potential root CA", f)
            ever_won = True
    return ever_won


def get_subsequent_certs(id_issuer, cert):
    while not get_ca_file(id_issuer):
        aias = cert.extensions.get_extension_for_class(
            x509.AuthorityInformationAccess
        ).value
        my_url = None
        for aia in aias:
            if x509.OID_CA_ISSUERS == aia.access_method:
                my_url = aia.access_location.value
                break
        if my_url is None:
            continue
        req = urllib.request.Request(url=my_url)
        with urllib.request.urlopen(req) as conn:
            certs = x509.load_der_x509_certificate(conn.read())
        id_issuer = get_next_cert(certs, id_issuer)[1]


def get_file_certs(start_file):
    """Get cert chain part from a file, printing the way.."""
    id_cert = None
    cert = None
    with open(start_file, "rb") as fp:
        certs = x509.load_pem_x509_certificates(fp.read())
        for cert in certs:
            id_cert = get_next_cert(cert, id_cert)[1]
    get_subsequent_certs(id_cert, cert)


def get_host1_cert(all_certs_file, host_name, port=4460):
    """Get cert chain from network hosts, printing the way."""
    id_old_issuer = certs = None
    context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    context.load_verify_locations(all_certs_file)
    gais = socket.getaddrinfo(host_name, port, proto=socket.IPPROTO_TCP)
    blob = ""
    for gai in gais:
        with context.wrap_socket(
            socket.socket(gai[0]), server_hostname=host_name
        ) as conn:
            try:
                conn.settimeout(5.1)
                conn.connect(gai[4])
                blob = conn.getpeercert(True)
                break
            except (
                ssl.SSLError,
                socket.error,
                socket.herror,
                socket.gaierror,
                socket.timeout,
            ) as e:
                print(repr(e))
    certs = x509.load_der_x509_certificate(blob)
    id_old_issuer = get_next_cert(certs, None)[1]
    cert = certs[-1] if isinstance(certs, (list, tuple)) else certs
    get_subsequent_certs(id_old_issuer, cert)


if "__main__" == __name__:
    parser = argparse.ArgumentParser(
        prog="cert-sweep",
        epilog="You must specify a host and/or a file name",
        description="Dig and print useful TLS certificate information",
    )
    parser.add_argument(
        "-c",
        "--certsfile",
        dest="cas",
        type=str,
        default="/etc/ssl/certs/ca-certificates.crt",
        help="the file containing all of the root certificates",
    )
    parser.add_argument(
        "-d",
        "--certsdir",
        dest="certdir",
        type=str,
        default=CERT_DIR,
        help="the directory where root certs are located",
    )
    parser.add_argument(
        "-f",
        "--file",
        dest="certfile",
        type=str,
        default=None,
        help="a file to extrapolate from",
    )
    parser.add_argument(
        "-H",
        "--host",
        dest="certhost",
        type=str,
        default=None,
        help="Host to dig info from",
    )
    parser.add_argument(
        "-p",
        "--port",
        dest="port",
        type=int,
        default=4460,
        help="the port to ask on for -H --host certs",
    )
    parser.add_argument(
        "-v",
        "--version",
        action="version",
        version="%(prog)s v0",
        help="Print version and exit",
    )
    options = parser.parse_args()
    if not 0 < options.port < 65536:
        print(
            "Port must be between 1 and 65535 inclusive.",
            file=sys.stderr,
        )
    CERT_DIR = options.certdir
    if not (options.certhost or options.certfile):
        parser.parse_args(["--help"])
        sys.exit(1)
    if options.certhost is not None:
        get_host1_cert(
            options.cas,
            options.certhost,
            options.port,
        )
    if options.certfile is not None:
        if not os.access(options.certfile, os.O_RDONLY):
            print("Can not read file.", file=sys.stderr)
            sys.exit(1)
        get_file_certs(options.certfile)
_______________________________________________
devel mailing list
devel@ntpsec.org
https://lists.ntpsec.org/mailman/listinfo/devel

Reply via email to