From: ale Date: Mon, 6 Feb 2012 19:02:45 +0000 (+0000) Subject: upgrade to CAM v2.0 X-Git-Url: https://v.licheni.net/stack/cam.git/commitdiff_plain/112c04e3926d62291efd902a1dcb0b2d24feeb59 upgrade to CAM v2.0 --- diff --git a/README b/README new file mode 100644 index 0000000..0e92373 --- /dev/null +++ b/README @@ -0,0 +1,67 @@ + +cam - minimal X509 Certification Authority management +===================================================== + +`cam` is a tiny Python program that can be used to manage a X509 +certification authority for a small organization. It can only create +server certificates, so this is not going to be useful to manage an +X509-based client authentication infrastructure. + +The intended usage involves describing the list of certificates to +generate in a configuration file, and using the `cam' tool to create +and renew them. + + +Configuration +------------- + +The configuration file uses INI-like syntax, consisting of a number of +sections. There are two special sections: `ca` and `global`, any other +section is interpreted as a certificate definition. + +The `ca` section contains the attributes of the CA itself, see the +example configuration file to see which attributes are supported. + +The `global` section contains configuration parameters for `cam`. The +only configuration parameter supported is `root_dir`, which is where all +the CA private data will be stored. If you leave this parameter empty, +or if you don't define a `global` section at all, this will default to +the directory containing the configuration file. + +Certificates are intentified by a ''tag'', (the section name), so for +example given the following configuration snippet:: + + [web] + cn = www.domain.org + +you would use the following command to generate it:: + + $ cam --config=my.config gen web + +Certificates and private keys are saved within the CA data directory, +you can obtain their path with:: + + $ cam --config=my.config files web + /your/ca/dir/public/certs/web.pem + /your/ca/dir/private/web.key + + +Installation +------------ + +The CA private keys are very sensitive information, so you'll want to +store them in some encrypted removable storage. You can bundle the `cam` +application itself with the CA data by using `virtualenv`:: + + $ virtualenv --no-site-packages /secure/cam + $ virtualenv --relocatable /secure/cam + $ (cd /tmp ; git clone http://git.autistici.org/cam.git \ + && /secure/cam/bin/python setup.py install) + +Then you can simply mount your encrypted image wherever there is a +Python interpreter available (well, with the same architecture/OS too) +and run:: + + $ /secure/cam/bin/cam --config=/secure/ca/my.config ... + + diff --git a/cam b/cam deleted file mode 100755 index 7dd9eaa..0000000 --- a/cam +++ /dev/null @@ -1,84 +0,0 @@ -#!/usr/bin/python - - -import os, sys -import logging -sys.path.insert(0, os.path.join(os.path.dirname(sys.argv[0]), 'lib')) - -from utils import * -from cfg import * - -# commands -from gen import gen -from newca import newca -from list import list -from files import files -from initfs import initfs -from check import check - - -def Usage(): - print ''' -CAM v0.1 - (c)2006 by -A Certification Authority manager for complex situations. -Usage: %s [...] -Known commands: - - init - Initialize the environment by creating the necessary - directory structure - - newca [ []] - Create a new CA certificate (otherwise you can import - your own certificates) - - gen ... - Create (or re-create) the certificates corresponding - to TAG - - list - List all known certificates - - files ... - Dump all the certificate-related files of this TAG - - check - Should be run weekly from a cron job to warn you if - some certificates are about to expire (controlled by - the 'warning_days' parameter in the 'global' section - of the configuration) - - -The configuration will be read from '%s'. -It consists of a ini-style file, with one 'ca' section that -specifies global CA parameters, and more sections for each -tag with certificate-specific information. See the examples -for more details on how to write your own configuration. - -''' % (sys.argv[0], config_file_path) - sys.exit(0) - - -if len(sys.argv) < 2 or sys.argv[1] == 'help': - Usage() - -cmd = sys.argv[1] -if cmd == 'init': - initfs() -elif cmd == 'gen': - for tag in sys.argv[2:]: - gen(tag) -elif cmd == 'newca': - newca() -elif cmd == 'list': - list(sys.argv[2:]) -elif cmd == 'files': - for tag in sys.argv[2:]: - files(tag) -elif cmd == 'check': - check() -else: - print 'Unknown command \'%s\'.' % cmd - Usage() - - diff --git a/cam/__init__.py b/cam/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cam/ca.py b/cam/ca.py new file mode 100644 index 0000000..a14075c --- /dev/null +++ b/cam/ca.py @@ -0,0 +1,175 @@ +import errno +import fcntl +import logging +import re +import os +import getpass +import random +import shutil +import tempfile +import time +from cam import openssl_wrap +from cam import utils + +log = logging.getLogger(__name__) + + +class _CAFiles(object): + + def __init__(self, basedir, **attrs): + for key, value in attrs.items(): + setattr(self, key, os.path.join(basedir, value)) + + +class CA(object): + + def __init__(self, basedir, config, password=None): + self._pw = password + self.basedir = basedir + self.config = {'basedir': basedir, 'default_days': '365', 'ou': 'CA', + 'days': '3650', 'country': 'XX', 'crl_url': '', + 'bits': '4096'} + self.config.update(config) + self.files = _CAFiles(basedir, + conf='conf/ca.conf', + public_key='public/ca.pem', + private_key='private/ca.key', + crl='public/crl.pem', + serial='serial', + crlnumber='crlnumber', + index='index') + self._lock() + + def _getpw(self): + if self._pw is None: + self._pw = getpass.getpass(prompt='CA Password: ') + return self._pw + + def _lock(self): + self._lockfd = open(os.path.join(self.basedir, '_lock'), 'w+') + n = 3 + while True: + try: + fcntl.lockf(self._lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB) + break + except IOError, e: + if e.errno in (errno.EACCES, errno.EAGAIN): + n -= 1 + if n == 0: + log.error('another instance is running') + raise + time.sleep(1) + continue + raise + + def _unlock(self): + fcntl.lockf(self._lockfd, fcntl.LOCK_UN) + self._lockfd.close() + + def close(self): + self._unlock() + + def create(self): + old_umask = os.umask(077) + + for pathext in ('', 'conf', 'public', 'public/certs', + 'public/crl', 'private', 'newcerts'): + fullpath = os.path.join(self.basedir, pathext) + if not os.path.isdir(fullpath): + os.mkdir(fullpath) + + if not os.path.exists(self.files.index): + log.info('creating new index file') + open(self.files.index, 'w').close() + + if not os.path.exists(self.files.serial): + serial = random.randint(1, 1000000000) + log.info('initializing serial number (%d)', serial) + with open(self.files.serial, 'w') as fd: + fd.write('%08X\n' % serial) + + if not os.path.exists(self.files.crlnumber): + with open(self.files.crlnumber, 'w') as fd: + fd.write('01\n') + + # Create the OpenSSL configuration file. + utils.render(self.files.conf, 'openssl_config', self.config) + + # Generate keys if they do not exist. + if not os.path.exists(self.files.public_key): + tmpdir = tempfile.mkdtemp() + csr_file = os.path.join(tmpdir, 'ca.csr') + log.info('creating new RSA CA CSR') + openssl_wrap.run_with_config( + self.files.conf, 'req', '-new', + '-passout', 'pass:%s' % self._getpw(), + '-keyout', self.files.private_key, '-out', csr_file) + log.info('self-signing RSA CA certificate') + openssl_wrap.run_with_config( + self.files.conf, 'ca', '-keyfile', self.files.private_key, + '-key', self._getpw(), + '-extensions', 'v3_ca', '-out', self.files.public_key, + '-days', self.config.get('days', self.config['default_days']), + '-selfsign', '-infiles', csr_file) + shutil.rmtree(tmpdir) + + os.umask(old_umask) + + # Make some files public. + for path in (os.path.join(self.basedir, 'public'), + os.path.join(self.basedir, 'public/certs'), + self.files.public_key): + if os.path.isdir(path): + os.chmod(path, 0755) + else: + os.chmod(path, 0644) + + def gencrl(self): + log.info('generating CRL') + openssl_wrap.run_with_config( + self.files.conf, 'ca', '-gencrl', '-out', self.files.crl, + '-key', self._getpw()) + os.chmod(self.files.crl, 0644) + + def revoke(self, cert): + log.info('revoking certificate %s', cert.name) + openssl_wrap.run_with_config( + self.files.conf, 'ca', '-revoke', cert.public_key_file, + '-key', self._getpw()) + self.gencrl() + + def generate(self, cert): + expiry = cert.get_expiration_date() + if expiry and expiry > time.time(): + log.warn('certificate is still valid, revoking previous version') + self.revoke(cert) + + log.info('generating new certificate %s', cert.name) + tmpdir = tempfile.mkdtemp() + try: + csr_file = os.path.join(tmpdir, '%s.csr' % cert.name) + conf_file = os.path.join(tmpdir, '%s.conf' % cert.name) + ext_file = os.path.join(tmpdir, '%s-ext.conf' % cert.name) + conf = {} + conf.update(self.config) + conf['cn'] = cert.cn + conf['days'] = cert.days or self.config['default_days'] + if cert.ou: + conf['ou'] = cert.ou + conf['alt_names'] = ''.join( + ['DNS.%d=%s\n' % (idx + 1, x) + for idx, x in enumerate(cert.alt_names)]) + utils.render(conf_file, 'openssl_config', conf) + utils.render(ext_file, 'ext_config', conf) + openssl_wrap.run_with_config( + conf_file, 'req', '-new', '-keyout', cert.private_key_file, + '-nodes', '-out', csr_file) + os.chmod(cert.private_key_file, 0600) + openssl_wrap.run_with_config( + conf_file, 'ca', '-days', conf['days'], + '-key', self._getpw(), + '-policy', 'policy_anything', '-out', cert.public_key_file, + '-extfile', ext_file, '-infiles', csr_file) + finally: + shutil.rmtree(tmpdir) + diff --git a/cam/cert.py b/cam/cert.py new file mode 100644 index 0000000..9254fb0 --- /dev/null +++ b/cam/cert.py @@ -0,0 +1,58 @@ +import os +import re +import string +import time +from cam import openssl_wrap + + +def _parse_alt_names(s): + if not s: + return [] + if ',' in s: + parts = s.split(',') + else: + parts = s.split() + return [x.strip() for x in parts if x] + + +class Cert(object): + + def __init__(self, ca, name, config): + self.name = name + self.ca = ca + self.cn = config['cn'] + self.ou = config.get('ou', '') + self.days = config.get('days') + + self.alt_names = _parse_alt_names(config.get('alt_names')) + if self.cn not in self.alt_names: + self.alt_names.insert(0, self.cn) + self.public_key_file = os.path.join(ca.basedir, 'public', 'certs', + '%s.pem' % name) + self.private_key_file = os.path.join(ca.basedir, 'private', + '%s.key' % name) + + def get_fingerprint(self, digest='sha1'): + if os.path.exists(self.public_key_file): + output = openssl_wrap.run('x509', '-in', self.public_key_file, + '-noout', '-fingerprint', '-%s' % digest) + m = re.search(r'=(.*)$', output) + if m: + return m.group(1) + return None + + def get_expiration_date(self): + if os.path.exists(self.public_key_file): + output = openssl_wrap.run('x509', '-in', self.public_key_file, + '-noout', '-dates') + m = re.search(r'notAfter=(.*)', output) + if m: + return time.mktime(time.strptime(m.group(1), + '%b %d %H:%M:%S %Y %Z')) + return None + + def expired(self): + now = time.time() + return self.get_expiration_date() > now + + diff --git a/cam/config.py b/cam/config.py new file mode 100644 index 0000000..4e73a2b --- /dev/null +++ b/cam/config.py @@ -0,0 +1,26 @@ +import ConfigParser +import os +from cam import cert +from cam import ca + + +class ConfigError(Exception): + pass + + +def read_config(filename): + parser = ConfigParser.ConfigParser() + if not parser.read(filename): + raise ConfigError('File not found: %s' % filename) + root_dir = os.path.dirname(os.path.abspath(filename)) + global_config = {} + if parser.has_section('global'): + global_config = dict(parser.items('global')) + root_dir = global_config.get('root_dir', root_dir) + ca_obj = ca.CA(root_dir, dict(parser.items('ca'))) + certs = [] + for section in parser.sections(): + if section in ('ca', 'global'): + continue + certs.append(cert.Cert(ca_obj, section, dict(parser.items(section)))) + return global_config, ca_obj, certs diff --git a/cam/main.py b/cam/main.py new file mode 100755 index 0000000..2e80029 --- /dev/null +++ b/cam/main.py @@ -0,0 +1,112 @@ +#!/usr/bin/python + +import logging +import optparse +import os +import sys +import time +from cam import config + + +USAGE = '''cam [] [...] +CAM v2.0 - (c)2012 by +A Certification Authority manager for complex situations. + +Known commands: + + init [ []] + Initialize the environment and create a new CA certificate + (you can also import your own existing certificates) + + gen ... + Create (or re-create) the certificates corresponding + to TAG + + gencrl + Update the CRL + + list + List all known certificates + + files ... + Dump all the certificate-related files of this TAG + + check + Should be run weekly from a cron job to warn you if some + certificates are about to expire (controlled by the 'warning_days' + parameter in the 'global' section of the configuration) + +The configuration file consists of a ini-style file, with one 'ca' +section that specifies global CA parameters, and more sections for +each tag with certificate-specific information. See the examples for +more details on how to write your own configuration. +''' + + +def find_cert(certs, name): + for c in certs: + if c.name == name: + return c + raise Exception('Certificate "%s" not found' % name) + + +def main(): + parser = optparse.OptionParser(usage=USAGE) + parser.add_option('-d', '--debug', dest='debug', help='Be verbose', + action='store_true') + parser.add_option('-c', '--config', dest='config', help='Config file') + opts, args = parser.parse_args() + if not opts.config: + parser.error('Must specify --config') + if len(args) < 1: + parser.error('Must specify a command') + + logging.basicConfig() + logging.getLogger().setLevel(opts.debug and logging.DEBUG or logging.INFO) + + global_config, ca, certs = config.read_config(opts.config) + + cmd, args = args[0], args[1:] + + try: + if cmd == 'init': + ca.create() + elif cmd == 'gen': + if len(args) != 1: + parser.error('Wrong number of arguments') + ca.generate(find_cert(certs, args[0])) + elif cmd == 'gencrl': + ca.gencrl() + elif cmd == 'files': + if len(args) != 1: + parser.error('Wrong number of arguments') + c = find_cert(certs, args[0]) + print c.public_key_file + print c.private_key_file + elif cmd == 'list': + for cert in sorted(certs, key=lambda x: x.name): + print cert.name, cert.cn, cert.get_expiration_date() + elif cmd == 'check': + now = time.time() + warning_time = 8640000 * int(global_config.get('warning_days', 15)) + for cert in certs: + exp = cert.get_expiration_date() + if exp and (exp - now) < warning_time: + print '%s (%s) is about to expire.' % (cert.name, cert.cn) + else: + parser.error('unknown command "%s"' % cmd) + finally: + ca.close() + + +def main_wrapper(): + try: + main() + return 0 + except Exception, e: + logging.exception('uncaught exception') + return 1 + + +if __name__ == '__main__': + sys.exit(main_wrapper()) diff --git a/cam/openssl_wrap.py b/cam/openssl_wrap.py new file mode 100644 index 0000000..db44f26 --- /dev/null +++ b/cam/openssl_wrap.py @@ -0,0 +1,26 @@ +import logging +import subprocess + +log = logging.getLogger(__name__) + + +class CommandError(Exception): + pass + + +def run(*args): + cmd = ['openssl'] + cmd.extend(args) + log.debug('executing "%s"' % ' '.join(cmd)) + pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE) + stdout, _ = pipe.communicate() + if pipe.returncode != 0: + raise CommandError('openssl exited with status %d' % ( + pipe.returncode,)) + return stdout + + +def run_with_config(config_file, *args): + cmd = args[0] + args = args[1:] + return run(cmd, '-config', config_file, '-batch', *args) diff --git a/cam/templates/ext_config b/cam/templates/ext_config new file mode 100644 index 0000000..8a193b5 --- /dev/null +++ b/cam/templates/ext_config @@ -0,0 +1,19 @@ +basicConstraints = CA:false +nsCertType = client, server +keyUsage = nonRepudiation, digitalSignature, keyEncipherment +extendedKeyUsage = clientAuth, serverAuth +nsComment = "%(cn)s" +subjectKeyIdentifier = hash +authorityKeyIdentifier = keyid, issuer:always +subjectAltName = @subject_alt_name +issuerAltName = issuer:copy +nsCaRevocationUrl = %(crl_url)s +nsRevocationUrl = %(crl_url)s +crlDistributionPoints = @cdp_section + +[ subject_alt_name ] +%(alt_names)s +email = copy + +[ cdp_section ] +URI.1 = %(crl_url)s diff --git a/lib/templates.py b/cam/templates/openssl_config similarity index 74% rename from lib/templates.py rename to cam/templates/openssl_config index b2e02cf..4583fca 100644 --- a/lib/templates.py +++ b/cam/templates/openssl_config @@ -1,15 +1,14 @@ - -openssl_conf_template = ''' -RANDFILE = %(ca_dir)s/.random +RANDFILE = %(basedir)s/.random [ ca ] default_ca = CA_default [ CA_default ] -dir = %(ca_dir)s +dir = %(basedir)s certs = $dir/public/certs crl_dir = $dir/public/crl crl = $dir/public/crl.pem +crlnumber = $dir/crlnumber database = $dir/index serial = $dir/serial new_certs_dir = $dir/newcerts @@ -38,7 +37,7 @@ commonName = supplied emailAddress = optional [ req ] -default_bits = 4096 +default_bits = %(bits)s default_md = sha1 distinguished_name = req_distinguished_name attributes = req_attributes @@ -76,27 +75,3 @@ nsComment = "%(cn)s" subjectAltName = email:copy issuerAltName = issuer:copy -''' - -ext_template = ''' -basicConstraints = CA:false -nsCertType = client, server -keyUsage = nonRepudiation, digitalSignature, keyEncipherment -extendedKeyUsage = clientAuth, serverAuth -nsComment = "%(ca_name)s" -subjectKeyIdentifier = hash -authorityKeyIdentifier = keyid, issuer:always -subjectAltName = @subject_alt_name -issuerAltName = issuer:copy -nsCaRevocationUrl = %(ca_base_url)s/crl.pem -nsRevocationUrl = %(ca_base_url)s/crl.pem -crlDistributionPoints = @cdp_section - -[ subject_alt_name ] -%(alt_names)s -email = copy - -[ cdp_section ] -URI.1 = %(ca_base_url)s/crl.pem -''' - diff --git a/cam/tests/__init__.py b/cam/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cam/tests/test_ca.py b/cam/tests/test_ca.py new file mode 100644 index 0000000..c443fe4 --- /dev/null +++ b/cam/tests/test_ca.py @@ -0,0 +1,57 @@ +import logging +import os +import tempfile +import shutil +import unittest +from cam import ca +from cam import openssl_wrap + + +logging.basicConfig(level=logging.DEBUG) + + +class CertStub(object): + + def __init__(self, name, cn, tmpdir): + self.name = name + self.cn = cn + self.alt_names = [cn] + self.ou = None + self.days = '365' + self.public_key_file = os.path.join(tmpdir, '%s.pub' % name) + self.private_key_file = os.path.join(tmpdir, '%s.priv' % name) + + def get_expiration_date(self): + return 123456789 + + +class CATest(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.ca = ca.CA(self.tmpdir, + {'cn': 'test ca', 'org': 'test', + 'bits': '1024', 'email': 'test@test.com'}, + password='testpw') + + def tearDown(self): + self.ca.close() + shutil.rmtree(self.tmpdir) + + def test_create(self): + self.ca.create() + self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'conf/ca.conf'))) + + def test_create_cert(self): + self.ca.create() + cert = CertStub('test', 'www.test.com', self.tmpdir) + self.ca.generate(cert) + self.assertTrue(os.path.exists(cert.public_key_file)) + self.assertTrue(os.path.exists(cert.private_key_file)) + + def test_revoke(self): + self.ca.create() + cert = CertStub('test', 'www.test.com', self.tmpdir) + self.ca.generate(cert) + self.ca.revoke(cert) + self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'public/crl.pem'))) diff --git a/cam/tests/test_cert.py b/cam/tests/test_cert.py new file mode 100644 index 0000000..a7e791f --- /dev/null +++ b/cam/tests/test_cert.py @@ -0,0 +1,86 @@ +import os +import tempfile +import time +import shutil +import unittest +from cam import cert + + +TEST_PEM = '''-----BEGIN CERTIFICATE----- +MIICNDCCAaECEAKtZn5ORf5eV288mBle3cAwDQYJKoZIhvcNAQECBQAwXzELMAkG +A1UEBhMCVVMxIDAeBgNVBAoTF1JTQSBEYXRhIFNlY3VyaXR5LCBJbmMuMS4wLAYD +VQQLEyVTZWN1cmUgU2VydmVyIENlcnRpZmljYXRpb24gQXV0aG9yaXR5MB4XDTk0 +MTEwOTAwMDAwMFoXDTEwMDEwNzIzNTk1OVowXzELMAkGA1UEBhMCVVMxIDAeBgNV +BAoTF1JTQSBEYXRhIFNlY3VyaXR5LCBJbmMuMS4wLAYDVQQLEyVTZWN1cmUgU2Vy +dmVyIENlcnRpZmljYXRpb24gQXV0aG9yaXR5MIGbMA0GCSqGSIb3DQEBAQUAA4GJ +ADCBhQJ+AJLOesGugz5aqomDV6wlAXYMra6OLDfO6zV4ZFQD5YRAUcm/jwjiioII +0haGN1XpsSECrXZogZoFokvJSyVmIlZsiAeP94FZbYQHZXATcXY+m3dM41CJVphI +uR2nKRoTLkoRWZweFdVJVCxzOmmCsZc5nG1wZ0jl3S3WyB57AgMBAAEwDQYJKoZI +hvcNAQECBQADfgBl3X7hsuyw4jrg7HFGmhkRuNPHoLQDQCYCPgmc4RKz0Vr2N6W3 +YQO2WxZpO8ZECAyIUwxrl0nHPjXcbLm7qt9cuzovk2C2qUtN8iD3zV9/ZHuO3ABc +1/p3yjkWWW8O6tO1g39NTUJWdrTJXwT4OPjr0l91X817/OWOgHz8UA== +-----END CERTIFICATE----- +''' + +TEST_SHA1 = '44:63:C5:31:D7:CC:C1:00:67:94:61:2B:B6:56:D3:BF:82:57:84:6F' +TEST_MD5 = '74:7B:82:03:43:F0:00:9E:6B:B3:EC:47:BF:85:A5:93' +TEST_EXPIRY = 1262908799.0 + + +class CAStub(object): + + def __init__(self, basedir): + self.basedir = basedir + + +class CertTest(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + os.makedirs(os.path.join(self.tmpdir, 'public', 'certs')) + self.ca = CAStub(self.tmpdir) + self.crt_file = os.path.join(self.tmpdir, 'public', 'certs', 'test.pem') + fd = open(self.crt_file, 'w') + fd.write(TEST_PEM) + fd.close() + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def test_get_fingerprint(self): + crt = cert.Cert(self.ca, 'test', {'cn': 'test.com'}) + md5 = crt.get_fingerprint('md5') + self.assertEquals(TEST_MD5, md5) + sha1 = crt.get_fingerprint('sha1') + self.assertEquals(TEST_SHA1, sha1) + + def test_get_fingerprint_nonexist(self): + crt = cert.Cert(self.ca, 'test-nonexist', {'cn': 'test.com'}) + result = crt.get_fingerprint('md5') + self.assertEquals(None, result) + + def test_cn_in_alt_names(self): + crt = cert.Cert(self.ca, 'test', {'cn': 'test.com', + 'alt_names': 'test2.com'}) + self.assert_('test.com' in crt.alt_names) + + def test_get_expiration_date(self): + crt = cert.Cert(self.ca, 'test', {'cn': 'test.com'}) + exp = crt.get_expiration_date() + self.assertEquals(TEST_EXPIRY, exp) + + def test_get_expiration_date_nonexist(self): + crt = cert.Cert(self.ca, 'test-nonexist', {'cn': 'test.com'}) + exp = crt.get_expiration_date() + self.assertEquals(None, exp) + + def test_expired(self): + crt = cert.Cert(self.ca, 'test', {'cn': 'test.com'}) + exp = crt.get_expiration_date() + now = time.time() + is_expired = (exp > now) + self.assertEquals(is_expired, crt.expired()) + + +if __name__ == '__main__': + unittest.main() diff --git a/cam/tests/test_config.py b/cam/tests/test_config.py new file mode 100644 index 0000000..174df09 --- /dev/null +++ b/cam/tests/test_config.py @@ -0,0 +1,51 @@ +import mox +import os +import tempfile +import unittest +from cam import config + + +class ConfigTest(unittest.TestCase): + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.mox = mox.Mox() + + def tearDown(self): + self.mox.VerifyAll() + self.mox.UnsetStubs() + os.system("rm -fr '%s'" % self.tmpdir) + + def test_read_config(self): + test_cfg = ''' +[global] +root_dir = root + +[ca] +something = else + +[cert1] +cn = test.com +''' + cf_file = os.path.join(self.tmpdir, 'config') + cf_fd = open(cf_file, 'w') + cf_fd.write(test_cfg) + cf_fd.close() + self.mox.StubOutWithMock(config.ca, 'CA', use_mock_anything=True) + config.ca.CA('root', {'something': 'else'}).AndReturn('ca') + self.mox.StubOutWithMock(config.cert, 'Cert', use_mock_anything=True) + config.cert.Cert('ca', 'cert1', {'cn': 'test.com'}).AndReturn('cert1') + self.mox.ReplayAll() + + global_config, ca_obj, certs = config.read_config(cf_file) + self.assertEquals('ca', ca_obj) + self.assertEquals(['cert1'], certs) + + def test_read_config_nonexist(self): + def f(): + config.read_config('nonexist.conf') + self.assertRaises(config.ConfigError, f) + + +if __name__ == '__main__': + unittest.main() diff --git a/cam/tests/test_openssl_wrap.py b/cam/tests/test_openssl_wrap.py new file mode 100644 index 0000000..6392e1b --- /dev/null +++ b/cam/tests/test_openssl_wrap.py @@ -0,0 +1,56 @@ +import os +import unittest +import subprocess +import mox +from cam import openssl_wrap + + +class PopenStub(object): + + def __init__(self, stdout='', returncode=0): + self.stdout = stdout + self.returncode = returncode + + def communicate(self): + return self.stdout, 'unused' + + +class OpensslWrapTest(unittest.TestCase): + + def setUp(self): + self.mox = mox.Mox() + + def tearDown(self): + self.mox.VerifyAll() + self.mox.UnsetStubs() + + def test_run(self): + self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True) + pipe_stub = PopenStub(stdout='output') + subprocess.Popen(['openssl', 'test'], stdout=subprocess.PIPE + ).AndReturn(pipe_stub) + self.mox.ReplayAll() + result = openssl_wrap.run('test') + self.assertEquals('output', result) + + def test_run_fails(self): + self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True) + pipe_stub = PopenStub(returncode=1) + subprocess.Popen(['openssl', 'test'], stdout=subprocess.PIPE + ).AndReturn(pipe_stub) + self.mox.ReplayAll() + def r(): + result = openssl_wrap.run('test') + self.assertRaises(openssl_wrap.CommandError, r) + + def test_run_with_config(self): + self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True) + pipe_stub = PopenStub(stdout='output') + subprocess.Popen(['openssl', 'test', '-config', 'conf', '-batch', 'arg'], + stdout=subprocess.PIPE).AndReturn(pipe_stub) + self.mox.ReplayAll() + result = openssl_wrap.run_with_config('conf', 'test', 'arg') + + +if __name__ == '__main__': + unittest.main() diff --git a/cam/tests/test_utils.py b/cam/tests/test_utils.py new file mode 100644 index 0000000..e583622 --- /dev/null +++ b/cam/tests/test_utils.py @@ -0,0 +1,38 @@ +import os +import unittest +import tempfile +from cam import utils + + +class UtilsTest(unittest.TestCase): + + def setUp(self): + self.old_template_dir = utils._template_dir + self.tmpdir = tempfile.mkdtemp() + utils._template_dir = self.tmpdir + + def tearDown(self): + os.system('rm -fr "%s"' % self.tmpdir) + utils._template_dir = self.old_template_dir + + def test_render(self): + tf = os.path.join(utils._template_dir, 'test') + tfd = open(tf, 'w') + tfd.write('this is a %(sub)s\n') + tfd.close() + of = os.path.join(self.tmpdir, 'test.out') + utils.render(of, 'test', {'sub':'TEST'}) + self.assert_(os.path.exists(of)) + output = open(of, 'r').read() + self.assertEquals('this is a TEST\n', output) + + def test_parse_bool(self): + self.assertTrue(utils.parse_bool('y')) + self.assertTrue(utils.parse_bool('1')) + self.assertTrue(utils.parse_bool('true')) + self.assertFalse(utils.parse_bool('false')) + self.assertFalse(utils.parse_bool('no')) + + +if __name__ == '__main__': + unittest.main() diff --git a/cam/utils.py b/cam/utils.py new file mode 100644 index 0000000..e5685f2 --- /dev/null +++ b/cam/utils.py @@ -0,0 +1,16 @@ +import os + + +_template_dir = os.path.join(os.path.dirname(__file__), 'templates') + + +def render(output_file, template_name, args): + tpl = open(os.path.join(_template_dir, template_name), 'r') + outfd = open(output_file, 'w') + outfd.write(tpl.read() % args) + tpl.close() + outfd.close() + + +def parse_bool(s): + return s.lower() in ('1', 'y', 'yes', 'true', 'on') diff --git a/config-example b/config-example deleted file mode 100644 index 4adcee5..0000000 --- a/config-example +++ /dev/null @@ -1,30 +0,0 @@ - -[global] -root = /src/cam/test - -[ca] -name = Example Certification Authority -org = Example -country = AA -default_days = 365 -email = ca@domain.org -base_url = http://ca.domain.org/public/ - -[web] -cn = www.domain.org -alt_names = www1.domain.org, www2.domain.org, - www3.domain.org, www4.domain.org, www5.domain.org -ou = Example web services - -[imap] -cn = mail.domain.org -alt_names = mail.domain.org, imap.domain.org, domain.org -ou = Example mail services - -[smtp] -cn = smtp.domain.org -alt_names = mx1.domain.org, mx2.domain.org, mx3.domain.org, - mx4.domain.org, mx5.domain.org, smtp.domain.org, - mx.domain.org -ou = Example mail services - diff --git a/lib/cfg.py b/lib/cfg.py deleted file mode 100644 index 3075a2e..0000000 --- a/lib/cfg.py +++ /dev/null @@ -1,20 +0,0 @@ - -__all__ = [ 'cfg', 'ca_base', 'ca', 'config_file_path' ] - -import os, sys -import ConfigParser -from utils import * - -config_file_path = os.path.join(os.path.dirname(sys.argv[0]),'config') -if not os.path.exists(config_file_path): - print ''' -The configuration file '%s' does not exist. -Please create this file and fill in the configuration data -of your certificates so that 'cam' can operate. -''' % config_file_path - sys.exit(1) - -cfg = ConfigParser.ConfigParser() -cfg.read(config_file_path) -ca = cfg2dict(cfg, 'ca') -ca_base = cfg.get('global', 'root') diff --git a/lib/check.py b/lib/check.py deleted file mode 100644 index a6a67ec..0000000 --- a/lib/check.py +++ /dev/null @@ -1,26 +0,0 @@ - -import os -from time import time, strftime, localtime -from utils import * -from paths import * -from cfg import * - - -def check(): - now = time() - warning_days = 15 - if cfg.has_option('global', 'warning_days'): - warning_days = int(cfg.get('global', 'warning_days')) - for tag in cfg.sections(): - if tag == 'global' or tag == 'ca' or tag == 'DEFAULT': - continue - crt_file = getpath('rsa_crt', tag) - crt_date = getcertdate(crt_file) - if crt_date: - days = int((now - crt_date)/86400) - if days > -warning_days: - print ''' -The certificate '%s' should be renewed! -(Expiration date: %s) -''' % (tag, strftime('%d/%m/%Y', localtime(crt_date))) - diff --git a/lib/files.py b/lib/files.py deleted file mode 100644 index 5f16887..0000000 --- a/lib/files.py +++ /dev/null @@ -1,32 +0,0 @@ - -import os -from cfg import * -from paths import * - -fields = [ - ('conf', 'OpenSSL configuration file'), - ('rsa_key', 'RSA private key'), - ('rsa_crt', 'RSA certificate'), - ('dsa_key', 'DSA private key'), - ('dsa_crt', 'DSA certificate'), - ('dsa_parms', 'DSA parameters'), - ('public_crt', 'public certificates bundle'), - ('singlefile', 'single cert. file (with keys)') - ] - -def files(tag): - print ''' - -Files related to the '%s' certificates: - -''' % tag - for k, desc in fields: - p = getpath(k, tag) - star = ' ' - if not os.path.exists(p): - star = '*' - print '%-30s %s%s' % (desc, star, p) - - print '(* = not found)' - print - diff --git a/lib/gen.py b/lib/gen.py deleted file mode 100644 index 42795fe..0000000 --- a/lib/gen.py +++ /dev/null @@ -1,115 +0,0 @@ - - -import os, sys -import logging -from utils import * -from templates import * -from paths import * -from cfg import * - - -def gen(tag): - - info = cfg2dict(cfg, tag) - - conf_file = getpath('conf', tag) - rsa_key_file = getpath('rsa_key', tag) - dsa_key_file = getpath('dsa_key', tag) - dsa_parms_file = getpath('dsa_parms', tag) - csr_file = getpath('rsa_csr', tag) - dsa_csr_file = getpath('dsa_csr', tag) - ext_file = getpath('ext', tag) - public_crt_file = getpath('public_crt', tag) - crt_file = getpath('rsa_crt', tag) - dsa_crt_file = getpath('dsa_crt', tag) - sf_file = getpath('singlefile', tag) - - if os.path.exists(public_crt_file): - print - if expired(getcertdate(public_crt_file)): - print 'Certificate has expired. Ready to re-generate.' - else: - print - ans = raw_input('This certificate seems to exist already (in %s).\nAre you really sure that you want to re-create it? [y/N] ' % crt_file) - if not ans or ans[0].lower() != 'y': - sys.exit(0) - print 'Revoking previous certificate...' - openssl('ca', '-config', conf_file, - '-revoke', public_crt_file) - - - # create custom config file - template(conf_file, - openssl_conf_template, - dict( - ca_dir = ca_base, - default_days = ca['default_days'], - country = d2get(info, ca, 'country'), - org = d2get(info, ca, 'org'), - ou = d2get(info, ca, 'ou', ''), - cn = info['cn'], - email = d2get(info, ca, 'email'))) - - # create dsa parameters - openssl('dsaparam', '-out', dsa_parms_file, '1024') - - # create rsa key - openssl('req', '-batch', '-new', '-keyout', rsa_key_file, - '-config', conf_file, '-nodes', '-out', csr_file) - openssl('req', '-batch', '-new', '-newkey', 'dsa:' + dsa_parms_file, - '-keyout', dsa_key_file, '-nodes', - '-config', conf_file, '-out', dsa_csr_file) - - # create ext file - altnames = [ x.strip() for x in info['alt_names'].split(',') ] - altnames_s = '' - for i in range(len(altnames)): - altnames_s += 'DNS.%d=%s\n' % (i + 1, altnames[i]) - template(ext_file, - ext_template, - dict( - ca_name = ca['name'], - ca_base_url = ca['base_url'], - alt_names = altnames_s)) - - # sign requests - openssl('ca', '-days', ca['default_days'], - '-config', conf_file, '-batch', - '-policy', 'policy_anything', - '-out', crt_file, - '-extfile', ext_file, - '-infiles', csr_file) - openssl('ca', '-days', ca['default_days'], - '-config', conf_file, '-batch', - '-policy', 'policy_anything', - '-out', dsa_crt_file, - '-extfile', ext_file, - '-infiles', dsa_csr_file) - f = open(public_crt_file, 'w') - f.write(open(crt_file, 'r').read()) - f.write(open(dsa_crt_file, 'r').read()) - f.close() - - # create single-file file - f = open(sf_file, 'w') - f.write(open(crt_file, 'r').read()) - f.write(open(dsa_crt_file, 'r').read()) - f.write(open(rsa_key_file, 'r').read()) - f.write(open(dsa_key_file, 'r').read()) - f.close() - - logging.info('created certificate %s [%s]' % (tag, info['cn'])) - - print ''' -Certificate '%s': - - CN: %s - AltNames: %s - - RSA key: %s - DSA key: %s - public crt: %s - all-in-one: %s - -''' % (tag, info['cn'], ', '.join(altnames), - rsa_key_file, dsa_key_file, public_crt_file, sf_file) diff --git a/lib/initfs.py b/lib/initfs.py deleted file mode 100644 index b2527bc..0000000 --- a/lib/initfs.py +++ /dev/null @@ -1,16 +0,0 @@ - - -import os -import logging -from utils import * -from cfg import * - - -def initfs(): - mkdir(ca_base) - for sub in [ 'archive', 'ext', 'conf', - 'private', 'private/certs', 'private/single-file', - 'public', 'public/certs', 'public/crl', 'newcerts' ]: - mkdir(os.path.join(ca_base, sub)) - os.chmod(os.path.join(ca_base, 'private'), 0700) - logging.info('created directory structure') diff --git a/lib/list.py b/lib/list.py deleted file mode 100644 index 5b32508..0000000 --- a/lib/list.py +++ /dev/null @@ -1,69 +0,0 @@ - -import os, re -import commands -from time import time, strftime, localtime -from utils import * -from templates import * -from paths import * -from cfg import * - - - -def list(args): - verbose = 0 - if len(args) > 0 and args[0] == '-v': - verbose = 1 - - print ''' - -List of known certificates for '%s' CA: - -''' % ca['name'] - - i = 0 - sections = cfg.sections() - sections.sort() - for sec in sections: - if sec == 'ca' or sec == 'global': - continue - i += 1 - crt_file = getpath('rsa_crt', sec) - dsa_crt_file = getpath('dsa_crt', sec) - if os.path.exists(crt_file): - crt_date = getcertdate(crt_file) - days = int((time() - crt_date) / 86400) - if days > 0: - days_s = '%d days ago' % days - else: - days_s = '%d days left' % (-days) - if expired(crt_date): - m = 'EXPIRED %s' % days_s.upper() - star = '*' - else: - if verbose: - m = '\n%21s Expiration: %s (%s)' % \ - (' ', - strftime('%d %b %Y', localtime(crt_date)), - days_s) - else: - m = 'exp. %s' % strftime('%d/%m/%Y', - localtime(crt_date)) - star = ' ' - else: - m = 'NOT FOUND' - star = '*' - print '%3d. %s%-15s %s [%s] - %s' % (i, star, sec, cfg.get(sec, 'cn'), cfg.get(sec, 'alt_names').replace('\n', ' '), m) - if star != '*' and verbose: - # do fingerprints - for hname, hash in [ ('MD5', 'md5'), ('SHA1', 'sha1') ]: - for cypher, file in [ ( 'RSA', crt_file), ( 'DSA', dsa_crt_file ) ]: - fp = fingerprint(hash, file) - print '%21s %s %s fingerprint: %s' % ('', cypher, hname, fp) - - if verbose: - print - - print ''' -(* = certificate does not exist, create with 'cam gen ') -''' - diff --git a/lib/newca.py b/lib/newca.py deleted file mode 100644 index 63fcea2..0000000 --- a/lib/newca.py +++ /dev/null @@ -1,62 +0,0 @@ - -import os, logging -from utils import * -from templates import * -from cfg import * - - -def newca(): - - conf_file = os.path.join(ca_base, 'conf/ca.conf') - ca_file = os.path.join(ca_base, 'public/ca.pem') - ca_dsa_file = os.path.join(ca_base, 'public/ca-dsa.tmp') - ca_key_file = os.path.join(ca_base, 'private/ca.key') - ca_dsa_key_file = os.path.join(ca_base, 'private/ca-dsa.key') - ca_csr_file = os.path.join(ca_base, 'newcerts/ca.csr') - ca_dsa_csr_file = os.path.join(ca_base, 'newcerts/ca-dsa.csr') - dsa_parms_file = os.path.join(ca_base, 'private/ca.dsap') - - serial_file = os.path.join(ca_base, 'serial') - index_file = os.path.join(ca_base, 'index') - if not os.path.exists(serial_file): - open(serial_file, 'w').write('01') - if not os.path.exists(index_file): - open(index_file, 'w').close() - - template(conf_file, - openssl_conf_template, - dict( - ca_dir = ca_base, - default_days = ca['default_days'], - country = ca['country'], - org = ca['org'], - ou = ca.get('ou', ''), - cn = ca['name'], - email = ca['email'])) - if not os.path.exists(dsa_parms_file): - openssl('dsaparam', '-out', dsa_parms_file, '1024') - logging.info('generated CA DSA parameters') - if not os.path.exists(ca_file): - openssl('req', '-new', '-keyout', ca_key_file, - '-config', conf_file, '-batch', - '-out', ca_csr_file) - openssl('req', '-new', '-newkey', 'dsa:' + dsa_parms_file, - '-config', conf_file, '-batch', - '-keyout', ca_dsa_key_file, - '-out', ca_dsa_csr_file) - openssl('ca', - '-config', conf_file, '-batch', - '-keyfile', ca_key_file, - '-extensions', 'v3_ca', - '-out', ca_file, '-selfsign', - '-infiles', ca_csr_file) - openssl('ca', - '-config', conf_file, '-batch', - '-keyfile', ca_dsa_key_file, - '-extensions', 'v3_ca', - '-out', ca_dsa_file, '-selfsign', - '-infiles', ca_dsa_csr_file) - open(ca_file, 'a').write(open(ca_dsa_file, 'r').read()) - os.remove(ca_dsa_file) - logging.info('created CA certificates') - diff --git a/lib/paths.py b/lib/paths.py deleted file mode 100644 index ce8234b..0000000 --- a/lib/paths.py +++ /dev/null @@ -1,25 +0,0 @@ - -__all__ = [ 'getpath' ] - -import os -from cfg import * - - -path_exts = dict( - conf = 'conf/%s.conf', - rsa_key = 'private/%s.key', - dsa_key = 'private/%s-dsa.key', - dsa_parms = 'private/%s.dsap', - rsa_csr = 'newcerts/%s.csr', - dsa_csr = 'newcerts/%s-dsa.csr', - rsa_crt = 'private/certs/%s.pem', - dsa_crt = 'private/certs/%s-dsa.pem', - ext = 'ext/%s.ext', - public_crt = 'public/certs/%s.pem', - singlefile = 'private/single-file/%s.pem', - ) - -def getpath(what, tag): - return os.path.join(ca_base, path_exts[what] % tag) - - diff --git a/lib/utils.py b/lib/utils.py deleted file mode 100644 index 813ddef..0000000 --- a/lib/utils.py +++ /dev/null @@ -1,72 +0,0 @@ - -__all__ = [ - 'getcertdate', 'fingerprint', 'expired', - 'cfg2dict', 'mkdir', 'template', - 'dictmerge', 'd2get', - 'openssl' - ] - - -import commands -import os, re -import time - - -def getcertdate(crt_file): - if os.path.exists(crt_file): - o = commands.getoutput('openssl x509 -in %s -noout -dates' % crt_file) - m = re.search(r'notAfter=(.*)', o) - if m: - return time.mktime(time.strptime(m.group(1), - '%b %d %H:%M:%S %Y %Z')) - return 0 - -def expired(crtdate): - if crtdate < time.time(): - return 1 - else: - return 0 - -def fingerprint(hash, crt_file): - if os.path.exists(crt_file): - o = commands.getoutput('openssl x509 -in %s -noout -fingerprint -%s' % (crt_file, hash)) - m = re.search(r'=(.*)$', o) - if m: - return m.group(1) - return 'NOT FOUND' - -def cfg2dict(cfg, section): - d = dict() - for k, v in cfg.items(section): - d[k] = v - return d - -def mkdir(p): - try: - os.mkdir(p, 0755) - print 'created directory \'%s\'' % p - except OSError: - pass - -def template(file, t, args): - f = open(file, 'w') - f.write(t % args) - f.close() - -def dictmerge(d1, d2): - out = dict() - for d in [ d2, d1 ]: - for k, v in d.items(): - out[k] = v - return out - -def d2get(d1, d2, k, default=None): - return d1.get(k, d2.get(k, default)) - -def openssl(*args): - cmd = "openssl " + ' '.join(["'%s'" % x for x in args]) - print 'executing "%s"' % cmd - return os.system(cmd) - - - diff --git a/sample.conf b/sample.conf new file mode 100644 index 0000000..1936782 --- /dev/null +++ b/sample.conf @@ -0,0 +1,27 @@ + +[ca] +cn = Example Certification Authority +org = Example +country = AA +default_days = 365 +email = ca@domain.org +crl_url = http://ca.domain.org/ca.crl + +[web] +cn = www.domain.org +alt_names = www1.domain.org www2.domain.org + www3.domain.org www4.domain.org www5.domain.org +ou = Example web services + +[imap] +cn = mail.domain.org +alt_names = mail.domain.org imap.domain.org domain.org +ou = Example mail services + +[smtp] +cn = smtp.domain.org +alt_names = mx1.domain.org mx2.domain.org mx3.domain.org + mx4.domain.org mx5.domain.org smtp.domain.org + mx.domain.org +ou = Example mail services + diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..77e562e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,3 @@ +[egg_info] +tag_build = .dev + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..1c20350 --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +#!/usr/bin/python + +from setuptools import setup, find_packages + +setup( + name="cam", + version="2.0", + description="X509 Certification Authority management", + author="ale", + author_email="ale@incal.net", + url="http://code.autistici.org/p/cam", + install_requires=[], + setup_requires=[], + zip_safe=False, + packages=find_packages(), + entry_points={ + "console_scripts": [ + "cam = cam.main:main", + ], + }, + ) +