URL: https://github.com/freeipa/freeipa/pull/453 Author: tiran Title: #453: Cleanup certdb Action: opened
PR body: """ * use with statement to open/close files * prefer fchmod/fchown when a file descriptor is available * set permission before data is written to file * remove chdir() hack with proper cwd argument to ipautil.run() Do not ever change the working directory of a program. It's a really bad idea. Just consider what is going to happen if two threads or two different parts of a process decide to own control over the working directory? Signed-off-by: Christian Heimes <chei...@redhat.com> """ To pull the PR as Git branch: git remote add ghfreeipa https://github.com/freeipa/freeipa git fetch ghfreeipa pull/453/head:pr453 git checkout pr453
From 5dcbdc4b3e64f2b4cd466d5cc6a2d2e3040ffc85 Mon Sep 17 00:00:00 2001 From: Christian Heimes <chei...@redhat.com> Date: Thu, 9 Feb 2017 14:55:45 +0100 Subject: [PATCH] Cleanup certdb * use with statement to open/close files * prefer fchmod/fchown when a file descriptor is available * set permission before data is written to file * remove chdir() hack with proper cwd argument to ipautil.run() Do not ever change the working directory of a program. It's a really bad idea. Just consider what is going to happen if two threads or two different parts of a process decide to own control over the working directory? Signed-off-by: Christian Heimes <chei...@redhat.com> --- ipaserver/install/certs.py | 162 +++++++++++++++++++++------------------------ 1 file changed, 74 insertions(+), 88 deletions(-) diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py index 80918d4..beeeb24 100644 --- a/ipaserver/install/certs.py +++ b/ipaserver/install/certs.py @@ -103,10 +103,6 @@ def __init__( self.host_name = host_name self.ca_subject = ca_subject self.subject_base = subject_base - try: - self.cwd = os.getcwd() - except OSError as e: - raise RuntimeError("Unable to determine the current directory: %s" % str(e)) self.cacert_name = get_ca_nickname(self.realm) self.valid_months = "120" @@ -132,10 +128,8 @@ def __init__( def __del__(self): if self.reqdir is not None: shutil.rmtree(self.reqdir, ignore_errors=True) - try: - os.chdir(self.cwd) - except OSError: - pass + self.reqdir = None + self.nssdb.close() def setup_cert_request(self): """ @@ -152,23 +146,26 @@ def setup_cert_request(self): self.certreq_fname = self.reqdir + "/tmpcertreq" self.certder_fname = self.reqdir + "/tmpcert.der" - # When certutil makes a request it creates a file in the cwd, make - # sure we are in a unique place when this happens - os.chdir(self.reqdir) - - def set_perms(self, fname, write=False, uid=None): - if uid: - pent = pwd.getpwnam(uid) - os.chown(fname, pent.pw_uid, pent.pw_gid) + def set_perms(self, fname, write=False, user=None): + if user is not None: + pent = pwd.getpwnam(user) + uid, gid = pent.pw_uid, pent.pw_gid else: - os.chown(fname, self.uid, self.gid) + uid, gid = self.uid, self.gid perms = stat.S_IRUSR if write: perms |= stat.S_IWUSR - os.chmod(fname, perms) + if hasattr(fname, 'fileno'): + os.fchown(fname.fileno(), uid, gid) + os.fchmod(fname.fileno(), perms) + else: + os.chown(fname, uid, gid) + os.chmod(fname, perms) def run_certutil(self, args, stdin=None, **kwargs): - return self.nssdb.run_certutil(args, stdin, **kwargs) + # When certutil makes a request it creates a file in the cwd, make + # sure we are in a unique place when this happens + return self.nssdb.run_certutil(args, stdin, cwd=self.reqdir, **kwargs) def run_signtool(self, args, stdin=None): with open(self.passwd_fname, "r") as f: @@ -176,24 +173,23 @@ def run_signtool(self, args, stdin=None): new_args = [paths.SIGNTOOL, "-d", self.secdir, "-p", password] new_args = new_args + args - ipautil.run(new_args, stdin) + ipautil.run(new_args, stdin, cwd=self.reqdir) def create_noise_file(self): if ipautil.file_exists(self.noise_fname): os.remove(self.noise_fname) - f = open(self.noise_fname, "w") - f.write(ipautil.ipa_generate_password()) - self.set_perms(self.noise_fname) + with open(self.noise_fname, "w") as f: + self.set_perms(f) + f.write(ipautil.ipa_generate_password()) def create_passwd_file(self, passwd=None): ipautil.backup_file(self.passwd_fname) - f = open(self.passwd_fname, "w") - if passwd is not None: - f.write("%s\n" % passwd) - else: - f.write(ipautil.ipa_generate_password()) - f.close() - self.set_perms(self.passwd_fname) + with open(self.passwd_fname, "w") as f: + self.set_perms(f) + if passwd is not None: + f.write("%s\n" % passwd) + else: + f.write(ipautil.ipa_generate_password()) def create_certdbs(self): ipautil.backup_file(self.certdb_fname) @@ -232,20 +228,21 @@ def export_ca_cert(self, nickname, create_pkcs12=False): # export the CA cert for use with other apps ipautil.backup_file(self.cacert_fname) root_nicknames = self.find_root_cert(nickname)[:-1] - fd = open(self.cacert_fname, "w") - for root in root_nicknames: - result = self.run_certutil(["-L", "-n", root, "-a"], - capture_output=True) - fd.write(result.output) - fd.close() - os.chmod(self.cacert_fname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + with open(self.cacert_fname, "w") as f: + os.fchmod(f.fileno(), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + for root in root_nicknames: + result = self.run_certutil(["-L", "-n", root, "-a"], + capture_output=True) + f.write(result.output) + if create_pkcs12: ipautil.backup_file(self.pk12_fname) ipautil.run([paths.PK12UTIL, "-d", self.secdir, "-o", self.pk12_fname, "-n", self.cacert_name, "-w", self.passwd_fname, - "-k", self.passwd_fname]) + "-k", self.passwd_fname], + cwd=self.reqdir) self.set_perms(self.pk12_fname) def load_cacert(self, cacert_fname, trust_flags): @@ -253,9 +250,8 @@ def load_cacert(self, cacert_fname, trust_flags): Load all the certificates from a given file. It is assumed that this file creates CA certificates. """ - fd = open(cacert_fname) - certs = fd.read() - fd.close() + with open(cacert_fname) as f: + certs = f.read() st = 0 while True: @@ -336,14 +332,14 @@ def create_server_cert(self, nickname, hostname, other_certdb=None, subject=None if subject is None: subject=DN(('CN', hostname), self.subject_base) self.request_cert(subject, san_dnsnames=[hostname]) - cdb.issue_server_cert(self.certreq_fname, self.certder_fname) - self.import_cert(self.certder_fname, nickname) - fd = open(self.certder_fname, "r") - dercert = fd.read() - fd.close() - - os.unlink(self.certreq_fname) - os.unlink(self.certder_fname) + try: + cdb.issue_server_cert(self.certreq_fname, self.certder_fname) + self.import_cert(self.certder_fname, nickname) + with open(self.certder_fname, "r") as f: + dercert = f.read() + finally: + os.unlink(self.certreq_fname) + os.unlink(self.certder_fname) return dercert @@ -373,10 +369,8 @@ def issue_server_cert(self, certreq_fname, cert_fname): if self.host_name is None: raise RuntimeError("CA Host is not set.") - f = open(certreq_fname, "r") - csr = f.readlines() - f.close() - csr = "".join(csr) + with open(certreq_fname, "r") as f: + csr = f.read() # We just want the CSR bits, make sure there is nothing else csr = pkcs10.strip_header(csr) @@ -388,9 +382,9 @@ def issue_server_cert(self, certreq_fname, cert_fname): 'xmlOutput': 'true'} # Send the request to the CA - f = open(self.passwd_fname, "r") - password = f.readline() - f.close() + with open(self.passwd_fname, "r") as f: + password = f.readline() + result = dogtag.https_request( self.host_name, 8443, "/ca/ee/ca/profileSubmitSSLClient", self.secdir, password, "ipaCert", **params) @@ -417,9 +411,8 @@ def issue_server_cert(self, certreq_fname, cert_fname): # Write the certificate to a file. It will be imported in a later # step. This file will be read later to be imported. - f = open(cert_fname, "w") - f.write(cert) - f.close() + with open(cert_fname, "w") as f: + f.write(cert) def issue_signing_cert(self, certreq_fname, cert_fname): self.setup_cert_request() @@ -427,10 +420,8 @@ def issue_signing_cert(self, certreq_fname, cert_fname): if self.host_name is None: raise RuntimeError("CA Host is not set.") - f = open(certreq_fname, "r") - csr = f.readlines() - f.close() - csr = "".join(csr) + with open(certreq_fname, "r") as f: + csr = f.read() # We just want the CSR bits, make sure there is no thing else csr = pkcs10.strip_header(csr) @@ -442,9 +433,9 @@ def issue_signing_cert(self, certreq_fname, cert_fname): 'xmlOutput': 'true'} # Send the request to the CA - f = open(self.passwd_fname, "r") - password = f.readline() - f.close() + with open(self.passwd_fname, "r") as f: + password = f.readline() + result = dogtag.https_request( self.host_name, 8443, "/ca/ee/ca/profileSubmitSSLClient", self.secdir, password, "ipaCert", **params) @@ -463,9 +454,8 @@ def issue_signing_cert(self, certreq_fname, cert_fname): # Write the certificate to a file. It will be imported in a later # step. This file will be read later to be imported. - f = open(cert_fname, "w") - f.write(cert) - f.close() + with open(cert_fname, "w") as f: + f.write(cert) def add_cert(self, cert, nick, flags, pem=False): self.nssdb.add_cert(cert, nick, flags, pem) @@ -488,26 +478,22 @@ def create_pin_file(self): This is the format of Directory Server pin files. """ ipautil.backup_file(self.pin_fname) - f = open(self.pin_fname, "w") - f.write("Internal (Software) Token:") - pwdfile = open(self.passwd_fname) - f.write(pwdfile.read()) - f.close() - pwdfile.close() - self.set_perms(self.pin_fname) + with open(self.pin_fname, "w") as pinfile: + self.set_perms(pinfile) + pinfile.write("Internal (Software) Token:") + with open(self.passwd_fname) as pwdfile: + pinfile.write(pwdfile.read()) def create_password_conf(self): """ This is the format of mod_nss pin files. """ ipautil.backup_file(self.pwd_conf) - f = open(self.pwd_conf, "w") - f.write("internal:") - pwdfile = open(self.passwd_fname) - f.write(pwdfile.read()) - f.close() - pwdfile.close() - self.set_perms(self.pwd_conf, uid=constants.HTTPD_USER) + with open(self.pwd_conf, "w") as pwdfile: + self.set_perms(pwdfile, user=constants.HTTPD_USER) + pwdfile.write("internal:") + with open(self.passwd_fname) as pwdfile: + pwdfile.write(pwdfile.read()) def find_root_cert(self, nickname): """ @@ -543,7 +529,8 @@ def export_pkcs12(self, pkcs12_fname, pkcs12_pwd_fname, nickname=None): "-o", pkcs12_fname, "-n", nickname, "-k", self.passwd_fname, - "-w", pkcs12_pwd_fname]) + "-w", pkcs12_pwd_fname], + cwd=self.reqdir) def export_pem_p12(self, pkcs12_fname, pkcs12_pwd_fname, nickname, pem_fname): @@ -556,10 +543,9 @@ def create_from_cacert(self, cacert_fname, passwd=None): if ipautil.file_exists(self.certdb_fname): # We already have a cert db, see if it is for the same CA. # If it is we leave things as they are. - f = open(cacert_fname, "r") - newca = f.readlines() - f.close() - newca = "".join(newca) + with open(cacert_fname, "r") as f: + newca = f.read() + newca, _st = find_cert_from_txt(newca) cacert = self.get_cert_from_db(self.cacert_name)
-- Manage your subscription for the Freeipa-devel mailing list: https://www.redhat.com/mailman/listinfo/freeipa-devel Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code