URL: https://github.com/freeipa/freeipa/pull/177 Author: frasertweedale Title: #177: Add options to write lightweight CA cert or chain to file Action: synchronized
To pull the PR as Git branch: git remote add ghfreeipa https://github.com/freeipa/freeipa git fetch ghfreeipa pull/177/head:pr177 git checkout pr177
From 773304e4f3b68da29251fd0f4971aee936d93020 Mon Sep 17 00:00:00 2001 From: Fraser Tweedale <[email protected]> Date: Tue, 16 Aug 2016 13:16:58 +1000 Subject: [PATCH 1/3] Add function for extracting PEM certs from PKCS #7 Add a single function for extracting X.509 certs in PEM format from a PKCS #7 object. Refactor sites that execute ``openssl pkcs7`` to use the new function. Part of: https://fedorahosted.org/freeipa/ticket/6178 --- ipalib/x509.py | 23 +++++++++++++++++- ipapython/certdb.py | 9 ++----- ipaserver/install/cainstance.py | 52 +++++++++++++++-------------------------- 3 files changed, 43 insertions(+), 41 deletions(-) diff --git a/ipalib/x509.py b/ipalib/x509.py index e1c3867..caf0ddc 100644 --- a/ipalib/x509.py +++ b/ipalib/x509.py @@ -48,7 +48,9 @@ from ipalib import api from ipalib import util from ipalib import errors +from ipaplatform.paths import paths from ipapython.dn import DN +from ipapython import ipautil if six.PY3: unicode = str @@ -56,7 +58,9 @@ PEM = 0 DER = 1 -PEM_REGEX = re.compile(r'(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)', re.DOTALL) +PEM_REGEX = re.compile( + r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', + re.DOTALL) EKU_SERVER_AUTH = '1.3.6.1.5.5.7.3.1' EKU_CLIENT_AUTH = '1.3.6.1.5.5.7.3.2' @@ -145,6 +149,23 @@ def load_certificate_list_from_file(filename): return load_certificate_list(f.read()) +def pkcs7_to_pems(data, datatype=PEM): + """ + Extract certificates from a PKCS #7 object. + + Return a ``list`` of X.509 PEM strings. + + May throw ``ipautil.CalledProcessError`` on invalid data. + + """ + cmd = [ + paths.OPENSSL, "pkcs7", "-print_certs", + "-inform", "PEM" if datatype == PEM else "DER", + ] + result = ipautil.run(cmd, stdin=data, capture_output=True) + return PEM_REGEX.findall(result.output) + + def is_self_signed(certificate, datatype=PEM): cert = load_certificate(certificate, datatype) return cert.issuer == cert.subject diff --git a/ipapython/certdb.py b/ipapython/certdb.py index c2fe599..a3c8e95 100644 --- a/ipapython/certdb.py +++ b/ipapython/certdb.py @@ -272,13 +272,8 @@ def import_files(self, files, db_password_filename, import_keys=False, continue if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'): - args = [ - paths.OPENSSL, 'pkcs7', - '-print_certs', - ] try: - result = ipautil.run( - args, stdin=body, capture_output=True) + certs = x509.pkcs7_to_pems(body) except ipautil.CalledProcessError as e: if label == 'CERTIFICATE': root_logger.warning( @@ -290,7 +285,7 @@ def import_files(self, files, db_password_filename, import_keys=False, filename, line, e) continue else: - extracted_certs += result.output + '\n' + extracted_certs += '\n'.join(certs) + '\n' loaded = True continue diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py index 26755ee..a7c740d 100644 --- a/ipaserver/install/cainstance.py +++ b/ipaserver/install/cainstance.py @@ -743,44 +743,30 @@ def __import_ca_chain(self): # makes openssl throw up. data = base64.b64decode(chain) - result = ipautil.run( - [paths.OPENSSL, - "pkcs7", - "-inform", - "DER", - "-print_certs", - ], stdin=data, capture_output=True) - certlist = result.output + certlist = x509.pkcs7_to_pems(data, x509.DER) # Ok, now we have all the certificates in certs, walk through it # and pull out each certificate and add it to our database - st = 1 - en = 0 - subid = 0 ca_dn = DN(('CN','Certificate Authority'), self.subject_base) - while st > 0: - st = certlist.find('-----BEGIN', en) - en = certlist.find('-----END', en+1) - if st > 0: - try: - (chain_fd, chain_name) = tempfile.mkstemp() - os.write(chain_fd, certlist[st:en+25]) - os.close(chain_fd) - (_rdn, subject_dn) = certs.get_cert_nickname(certlist[st:en+25]) - if subject_dn == ca_dn: - nick = get_ca_nickname(self.realm) - trust_flags = 'CT,C,C' - else: - nick = str(subject_dn) - trust_flags = ',,' - self.__run_certutil( - ['-A', '-t', trust_flags, '-n', nick, '-a', - '-i', chain_name] - ) - finally: - os.remove(chain_name) - subid += 1 + for cert in certlist: + try: + chain_fd, chain_name = tempfile.mkstemp() + os.write(chain_fd, cert) + os.close(chain_fd) + (_rdn, subject_dn) = certs.get_cert_nickname(cert) + if subject_dn == ca_dn: + nick = get_ca_nickname(self.realm) + trust_flags = 'CT,C,C' + else: + nick = str(subject_dn) + trust_flags = ',,' + self.__run_certutil( + ['-A', '-t', trust_flags, '-n', nick, '-a', + '-i', chain_name] + ) + finally: + os.remove(chain_name) # Restore NSS trust flags of all previously existing certificates for nick, trust_flags in cert_backup_list: From 313fbf315d5f6aa6b13023e56a3d76c45aa575d0 Mon Sep 17 00:00:00 2001 From: Fraser Tweedale <[email protected]> Date: Fri, 18 Nov 2016 10:04:24 +1000 Subject: [PATCH 2/3] certdb: accumulate extracted certs as list of PEMs certdb.NSSDatabase.import_files currently accumulates certificates extracted from input files as a string, which is ugly. Accumulate a list of PEMs instead, and join() them just in time for PKCS #12 creation. Part of: https://fedorahosted.org/freeipa/ticket/6178 --- ipapython/certdb.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/ipapython/certdb.py b/ipapython/certdb.py index a3c8e95..4004579 100644 --- a/ipapython/certdb.py +++ b/ipapython/certdb.py @@ -236,7 +236,7 @@ def import_files(self, files, db_password_filename, import_keys=False, """ key_file = None extracted_key = None - extracted_certs = '' + extracted_certs = [] for filename in files: try: @@ -267,7 +267,7 @@ def import_files(self, files, db_password_filename, import_keys=False, filename, line, e) continue else: - extracted_certs += body + '\n' + extracted_certs.append(body) loaded = True continue @@ -285,7 +285,7 @@ def import_files(self, files, db_password_filename, import_keys=False, filename, line, e) continue else: - extracted_certs += '\n'.join(certs) + '\n' + extracted_certs.extend(certs) loaded = True continue @@ -335,7 +335,7 @@ def import_files(self, files, db_password_filename, import_keys=False, pass else: data = x509.make_pem(base64.b64encode(data)) - extracted_certs += data + '\n' + extracted_certs.append(data) continue # Try to import the file as PKCS#12 file @@ -376,14 +376,15 @@ def import_files(self, files, db_password_filename, import_keys=False, raise RuntimeError( "No server certificates found in %s" % (', '.join(files))) - certs = x509.load_certificate_list(extracted_certs) - for cert in certs: + for cert_pem in extracted_certs: + cert = x509.load_certificate(cert_pem) nickname = str(DN(cert.subject)) data = cert.public_bytes(serialization.Encoding.DER) self.add_cert(data, nickname, ',,') if extracted_key: - in_file = ipautil.write_tmp_file(extracted_certs + extracted_key) + in_file = ipautil.write_tmp_file( + '\n'.join(extracted_certs) + '\n' + extracted_key) out_file = tempfile.NamedTemporaryFile() out_password = ipautil.ipa_generate_password() out_pwdfile = ipautil.write_tmp_file(out_password) From e574e553010be0100615e31ac3541f029e2fd589 Mon Sep 17 00:00:00 2001 From: Fraser Tweedale <[email protected]> Date: Mon, 8 Aug 2016 14:27:20 +1000 Subject: [PATCH 3/3] Add options to write lightweight CA cert or chain to file Administrators need a way to retrieve the certificate or certificate chain of an IPA-managed lightweight CA. Add params to the `ca' object for carrying the CA certificate and chain (as multiple DER values), and add the `--certificate-out' option and `--chain' flag as client-side options for writing one or the other to a file. Fixes: https://fedorahosted.org/freeipa/ticket/6178 --- ipaclient/plugins/ca.py | 62 +++++++++++++++++++++++++++++ ipaserver/plugins/ca.py | 65 +++++++++++++++++++++++++++---- ipaserver/plugins/dogtag.py | 12 ++++++ ipatests/test_xmlrpc/tracker/ca_plugin.py | 44 +++++++++++++++++---- ipatests/test_xmlrpc/xmlrpc_test.py | 36 +++++++++++++++++ 5 files changed, 205 insertions(+), 14 deletions(-) create mode 100644 ipaclient/plugins/ca.py diff --git a/ipaclient/plugins/ca.py b/ipaclient/plugins/ca.py new file mode 100644 index 0000000..83239aa --- /dev/null +++ b/ipaclient/plugins/ca.py @@ -0,0 +1,62 @@ +# +# Copyright (C) 2016 FreeIPA Contributors see COPYING for license +# + +import base64 +from ipaclient.frontend import MethodOverride +from ipalib import util, x509, Flag, Str +from ipalib.plugable import Registry +from ipalib.text import _ + +register = Registry() + + +class WithCertOutArgs(MethodOverride): + + takes_options = ( + Str( + 'certificate_out?', + doc=_('Write certificate to file'), + include='cli', + cli_metavar='FILE', + ), + Flag( + 'chain', + default=False, + doc=_('Write certificate chain instead of single certificate'), + include='cli', + ), + ) + + def forward(self, *keys, **options): + filename = None + if 'certificate_out' in options: + filename = options.pop('certificate_out') + util.check_writable_file(filename) + chain = options.pop('chain', False) + if chain: + options['all'] = True + + result = super(WithCertOutArgs, self).forward(*keys, **options) + if filename: + def to_pem(x): + return x509.make_pem(x) + if chain: + ders = result['result']['certificate_chain'] + data = '\n'.join(to_pem(base64.b64encode(der)) for der in ders) + else: + data = to_pem(result['result']['certificate']) + with open(filename, 'wb') as f: + f.write(data) + + return result + + +@register(override=True, no_fail=True) +class ca_add(WithCertOutArgs): + pass + + +@register(override=True, no_fail=True) +class ca_show(WithCertOutArgs): + pass diff --git a/ipaserver/plugins/ca.py b/ipaserver/plugins/ca.py index d9ae8c8..89fe88c 100644 --- a/ipaserver/plugins/ca.py +++ b/ipaserver/plugins/ca.py @@ -2,14 +2,21 @@ # Copyright (C) 2016 FreeIPA Contributors see COPYING for license # -from ipalib import api, errors, output, DNParam, Str +import base64 + +import six + +from ipalib import api, errors, output, Bytes, DNParam, Str from ipalib.constants import IPA_CA_CN from ipalib.plugable import Registry from ipaserver.plugins.baseldap import ( LDAPObject, LDAPSearch, LDAPCreate, LDAPDelete, LDAPUpdate, LDAPRetrieve, LDAPQuery, pkey_to_value) -from ipaserver.plugins.cert import ca_enabled_check -from ipalib import _, ngettext +from ipaserver.plugins.cert import BaseCertObject, ca_enabled_check +from ipalib import _, ngettext, x509 + +if six.PY3: + unicode = str __doc__ = _(""" @@ -53,7 +60,7 @@ @register() -class ca(LDAPObject): +class ca(LDAPObject, BaseCertObject): """ Lightweight CA Object """ @@ -70,7 +77,12 @@ class ca(LDAPObject): label = _('Certificate Authorities') label_singular = _('Certificate Authority') - takes_params = ( + takes_params = tuple( + # filter out attrs with an equivalent defined as part of + # this LDAP object + option for option in BaseCertObject.takes_params + if option.name not in {'cacn', 'subject', 'issuer'} + ) + ( Str('cn', primary_key=True, cli_name='name', @@ -100,6 +112,12 @@ class ca(LDAPObject): doc=_('Issuer Distinguished Name'), flags=['no_create', 'no_update'], ), + Bytes( + 'certificate_chain*', + label=_("Certificate chain"), + doc=_("X.509 certificate chain"), + flags={'no_create', 'no_update', 'no_search'}, + ), ) permission_filter_objectclasses = ['ipaca'] @@ -144,6 +162,33 @@ class ca(LDAPObject): }, } + def _parse(self, obj, full=True): + """ + Remove fields defined in ``BaseCertObject`` that are not + used in the ``ca`` object. + + """ + super(ca, self)._parse(obj, full) + for k in {'issuer', 'subject'}: + if k in obj: + del obj[k] + + +def set_certificate_attrs(entry, options): + ca_id = entry['ipacaid'][0] + full = options.get('all', False) + with api.Backend.ra_lightweight_ca as ca_api: + der = ca_api.read_ca_cert(entry['ipacaid'][0]) + entry['certificate'] = unicode(base64.b64encode(der)) + if not options.get('raw', False): + api.Object.ca._parse(entry, full) + + if full: + pkcs7_der = ca_api.read_ca_chain(ca_id) + pems = x509.pkcs7_to_pems(pkcs7_der, x509.DER) + ders = [x509.normalize_certificate(pem) for pem in pems] + entry['certificate_chain'] = ders + @register() class ca_find(LDAPSearch): @@ -161,9 +206,11 @@ def execute(self, *keys, **options): class ca_show(LDAPRetrieve): __doc__ = _("Display the properties of a CA.") - def execute(self, *args, **kwargs): + def execute(self, *keys, **options): ca_enabled_check() - return super(ca_show, self).execute(*args, **kwargs) + result = super(ca_show, self).execute(*keys, **options) + set_certificate_attrs(result['result'], options) + return result @register() @@ -203,6 +250,10 @@ def pre_callback(self, ldap, dn, entry, entry_attrs, *keys, **options): entry['ipacasubjectdn'] = [resp['dn']] return dn + def post_callback(self, ldap, dn, entry_attrs, *keys, **options): + set_certificate_attrs(entry_attrs, options) + return dn + @register() class ca_del(LDAPDelete): diff --git a/ipaserver/plugins/dogtag.py b/ipaserver/plugins/dogtag.py index 0bdb4da..b77b21a 100644 --- a/ipaserver/plugins/dogtag.py +++ b/ipaserver/plugins/dogtag.py @@ -2125,6 +2125,18 @@ def read_ca(self, ca_id): except: raise errors.RemoteRetrieveError(reason=_("Response from CA was not valid JSON")) + def read_ca_cert(self, ca_id): + _status, _resp_headers, resp_body = self._ssldo( + 'GET', '{}/cert'.format(ca_id), + headers={'Accept': 'application/pkix-cert'}) + return resp_body + + def read_ca_chain(self, ca_id): + _status, _resp_headers, resp_body = self._ssldo( + 'GET', '{}/chain'.format(ca_id), + headers={'Accept': 'application/pkcs7-mime'}) + return resp_body + def disable_ca(self, ca_id): self._ssldo( 'POST', ca_id + '/disable', diff --git a/ipatests/test_xmlrpc/tracker/ca_plugin.py b/ipatests/test_xmlrpc/tracker/ca_plugin.py index ec58c28..a9a8e97 100644 --- a/ipatests/test_xmlrpc/tracker/ca_plugin.py +++ b/ipatests/test_xmlrpc/tracker/ca_plugin.py @@ -8,7 +8,17 @@ from ipapython.dn import DN from ipatests.test_xmlrpc.tracker.base import Tracker from ipatests.util import assert_deepequal -from ipatests.test_xmlrpc.xmlrpc_test import fuzzy_issuer, fuzzy_caid +from ipatests.test_xmlrpc.xmlrpc_test import ( + fuzzy_issuer, + fuzzy_caid, + fuzzy_base64, + fuzzy_positive_int, + fuzzy_positive_int_hex, + fuzzy_cert_validity, + fuzzy_hex_with_colons, + fuzzy_sequence_of, + fuzzy_bytes, +) from ipatests.test_xmlrpc import objectclasses @@ -19,12 +29,24 @@ class CATracker(Tracker): """Implementation of a Tracker class for CA plugin.""" - retrieve_keys = { + ldap_keys = { 'dn', 'cn', 'ipacaid', 'ipacasubjectdn', 'ipacaissuerdn', 'description' } - retrieve_all_keys = {'objectclass'} | retrieve_keys - create_keys = retrieve_all_keys - update_keys = retrieve_keys - {'dn'} + cert_keys = { + 'certificate', + 'serial_number', 'serial_number_hex', + 'valid_not_before', 'valid_not_after', + } + cert_all_keys = { + 'certificate_chain', + 'md5_fingerprint', 'sha1_fingerprint', + } + find_keys = ldap_keys + find_all_keys = {'objectclass'} | ldap_keys + retrieve_keys = ldap_keys | cert_keys + retrieve_all_keys = {'objectclass'} | retrieve_keys | cert_all_keys + create_keys = {'objectclass'} | retrieve_keys + update_keys = ldap_keys - {'dn'} def __init__(self, name, subject, desc=u"Test generated CA", default_version=None): @@ -59,6 +81,14 @@ def track_create(self): ipacasubjectdn=[self.ipasubjectdn], ipacaissuerdn=[fuzzy_issuer], ipacaid=[fuzzy_caid], + certificate=fuzzy_base64, + serial_number=fuzzy_positive_int, + serial_number_hex=fuzzy_positive_int_hex, + valid_not_before=fuzzy_cert_validity, + valid_not_after=fuzzy_cert_validity, + md5_fingerprint=fuzzy_hex_with_colons, + sha1_fingerprint=fuzzy_hex_with_colons, + certificate_chain=fuzzy_sequence_of(fuzzy_bytes), objectclass=objectclasses.ca ) self.exists = True @@ -102,9 +132,9 @@ def make_find_command(self, *args, **kwargs): def check_find(self, result, all=False, raw=False): """Check the plugin's `find` command result""" if all: - expected = self.filter_attrs(self.retrieve_all_keys) + expected = self.filter_attrs(self.find_all_keys) else: - expected = self.filter_attrs(self.retrieve_keys) + expected = self.filter_attrs(self.find_keys) assert_deepequal(dict( count=1, diff --git a/ipatests/test_xmlrpc/xmlrpc_test.py b/ipatests/test_xmlrpc/xmlrpc_test.py index 0ce1245..f42656f 100644 --- a/ipatests/test_xmlrpc/xmlrpc_test.py +++ b/ipatests/test_xmlrpc/xmlrpc_test.py @@ -22,6 +22,7 @@ """ from __future__ import print_function +import collections import datetime import inspect @@ -41,6 +42,8 @@ uuid_re = '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}' +hexdigit_re = '[0-9A-Fa-f]' + # Matches an ipauniqueid like u'784d85fd-eae7-11de-9d01-54520012478b' fuzzy_uuid = Fuzzy('^%s$' % uuid_re) @@ -49,6 +52,37 @@ '^cn=%s,cn=automember rebuild membership,cn=tasks,cn=config$' % uuid_re ) +# base64-encoded value +fuzzy_base64 = Fuzzy('^[0-9A-Za-z/+]+={0,2}$') + + +# certificate not-before or not-after date string +def fuzzy_cert_validity_test(s): + try: + datetime.datetime.strptime(s, "%a %b %d %H:%M:%S %Y %Z") + return True + except ValueError: + return False + +fuzzy_cert_validity = Fuzzy(test=fuzzy_cert_validity_test) + +fuzzy_positive_int = Fuzzy(type=int, test=lambda x: x > 0) + +fuzzy_positive_int_hex = Fuzzy('^0x{}+$'.format(hexdigit_re)) + +fuzzy_hex_with_colons = Fuzzy('^{0}{{2}}(:{0}{{2}})*$'.format(hexdigit_re)) + + +def fuzzy_sequence_of(fuzzy): + """Construct a Fuzzy for a Sequence of values matching the given Fuzzy.""" + def test(xs): + if not isinstance(xs, collections.Sequence): + return False + else: + return all(fuzzy == x for x in xs) + + return Fuzzy(test=test) + # Matches an automember task finish message fuzzy_automember_message = Fuzzy( '^Automember rebuild task finished\. Processed \(\d+\) entries\.$' @@ -109,6 +143,8 @@ # match any string fuzzy_string = Fuzzy(type=six.string_types) +fuzzy_bytes = Fuzzy(type=bytes) + # case insensitive match of sets def fuzzy_set_ci(s): return Fuzzy(test=lambda other: set(x.lower() for x in other) == set(y.lower() for y in s))
-- Manage your subscription for the Freeipa-devel mailing list: https://www.redhat.com/mailman/listinfo/freeipa-devel Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code
