--- /dev/null
+
+cam - minimal X509 Certification Authority management
+=====================================================
+
+`cam` is a tiny Python program that can be used to manage a X509
+certification authority for a small organization. It can only create
+server certificates, so this is not going to be useful to manage an
+X509-based client authentication infrastructure.
+
+The intended usage involves describing the list of certificates to
+generate in a configuration file, and using the `cam' tool to create
+and renew them.
+
+
+Configuration
+-------------
+
+The configuration file uses INI-like syntax, consisting of a number of
+sections. There are two special sections: `ca` and `global`, any other
+section is interpreted as a certificate definition.
+
+The `ca` section contains the attributes of the CA itself, see the
+example configuration file to see which attributes are supported.
+
+The `global` section contains configuration parameters for `cam`. The
+only configuration parameter supported is `root_dir`, which is where all
+the CA private data will be stored. If you leave this parameter empty,
+or if you don't define a `global` section at all, this will default to
+the directory containing the configuration file.
+
+Certificates are intentified by a ''tag'', (the section name), so for
+example given the following configuration snippet::
+
+ [web]
+ cn = www.domain.org
+
+you would use the following command to generate it::
+
+ $ cam --config=my.config gen web
+
+Certificates and private keys are saved within the CA data directory,
+you can obtain their path with::
+
+ $ cam --config=my.config files web
+ /your/ca/dir/public/certs/web.pem
+ /your/ca/dir/private/web.key
+
+
+Installation
+------------
+
+The CA private keys are very sensitive information, so you'll want to
+store them in some encrypted removable storage. You can bundle the `cam`
+application itself with the CA data by using `virtualenv`::
+
+ $ virtualenv --no-site-packages /secure/cam
+ $ virtualenv --relocatable /secure/cam
+ $ (cd /tmp ; git clone http://git.autistici.org/cam.git \
+ && /secure/cam/bin/python setup.py install)
+
+Then you can simply mount your encrypted image wherever there is a
+Python interpreter available (well, with the same architecture/OS too)
+and run::
+
+ $ /secure/cam/bin/cam --config=/secure/ca/my.config ...
+
+
+++ /dev/null
-#!/usr/bin/python
-
-
-import os, sys
-import logging
-sys.path.insert(0, os.path.join(os.path.dirname(sys.argv[0]), 'lib'))
-
-from utils import *
-from cfg import *
-
-# commands
-from gen import gen
-from newca import newca
-from list import list
-from files import files
-from initfs import initfs
-from check import check
-
-
-def Usage():
- print '''
-CAM v0.1 - (c)2006 by <ale@incal.net>
-A Certification Authority manager for complex situations.
-Usage: %s <COMMAND> [<ARG>...]
-Known commands:
-
- init
- Initialize the environment by creating the necessary
- directory structure
-
- newca [<RSA_CRT> [<DSA_CRT>]]
- Create a new CA certificate (otherwise you can import
- your own certificates)
-
- gen <TAG>...
- Create (or re-create) the certificates corresponding
- to TAG
-
- list
- List all known certificates
-
- files <TAG>...
- Dump all the certificate-related files of this TAG
-
- check
- Should be run weekly from a cron job to warn you if
- some certificates are about to expire (controlled by
- the 'warning_days' parameter in the 'global' section
- of the configuration)
-
-
-The configuration will be read from '%s'.
-It consists of a ini-style file, with one 'ca' section that
-specifies global CA parameters, and more sections for each
-tag with certificate-specific information. See the examples
-for more details on how to write your own configuration.
-
-''' % (sys.argv[0], config_file_path)
- sys.exit(0)
-
-
-if len(sys.argv) < 2 or sys.argv[1] == 'help':
- Usage()
-
-cmd = sys.argv[1]
-if cmd == 'init':
- initfs()
-elif cmd == 'gen':
- for tag in sys.argv[2:]:
- gen(tag)
-elif cmd == 'newca':
- newca()
-elif cmd == 'list':
- list(sys.argv[2:])
-elif cmd == 'files':
- for tag in sys.argv[2:]:
- files(tag)
-elif cmd == 'check':
- check()
-else:
- print 'Unknown command \'%s\'.' % cmd
- Usage()
-
-
--- /dev/null
+import errno
+import fcntl
+import logging
+import re
+import os
+import getpass
+import random
+import shutil
+import tempfile
+import time
+from cam import openssl_wrap
+from cam import utils
+
+log = logging.getLogger(__name__)
+
+
+class _CAFiles(object):
+
+ def __init__(self, basedir, **attrs):
+ for key, value in attrs.items():
+ setattr(self, key, os.path.join(basedir, value))
+
+
+class CA(object):
+
+ def __init__(self, basedir, config, password=None):
+ self._pw = password
+ self.basedir = basedir
+ self.config = {'basedir': basedir, 'default_days': '365', 'ou': 'CA',
+ 'days': '3650', 'country': 'XX', 'crl_url': '',
+ 'bits': '4096'}
+ self.config.update(config)
+ self.files = _CAFiles(basedir,
+ conf='conf/ca.conf',
+ public_key='public/ca.pem',
+ private_key='private/ca.key',
+ crl='public/crl.pem',
+ serial='serial',
+ crlnumber='crlnumber',
+ index='index')
+ self._lock()
+
+ def _getpw(self):
+ if self._pw is None:
+ self._pw = getpass.getpass(prompt='CA Password: ')
+ return self._pw
+
+ def _lock(self):
+ self._lockfd = open(os.path.join(self.basedir, '_lock'), 'w+')
+ n = 3
+ while True:
+ try:
+ fcntl.lockf(self._lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ break
+ except IOError, e:
+ if e.errno in (errno.EACCES, errno.EAGAIN):
+ n -= 1
+ if n == 0:
+ log.error('another instance is running')
+ raise
+ time.sleep(1)
+ continue
+ raise
+
+ def _unlock(self):
+ fcntl.lockf(self._lockfd, fcntl.LOCK_UN)
+ self._lockfd.close()
+
+ def close(self):
+ self._unlock()
+
+ def create(self):
+ old_umask = os.umask(077)
+
+ for pathext in ('', 'conf', 'public', 'public/certs',
+ 'public/crl', 'private', 'newcerts'):
+ fullpath = os.path.join(self.basedir, pathext)
+ if not os.path.isdir(fullpath):
+ os.mkdir(fullpath)
+
+ if not os.path.exists(self.files.index):
+ log.info('creating new index file')
+ open(self.files.index, 'w').close()
+
+ if not os.path.exists(self.files.serial):
+ serial = random.randint(1, 1000000000)
+ log.info('initializing serial number (%d)', serial)
+ with open(self.files.serial, 'w') as fd:
+ fd.write('%08X\n' % serial)
+
+ if not os.path.exists(self.files.crlnumber):
+ with open(self.files.crlnumber, 'w') as fd:
+ fd.write('01\n')
+
+ # Create the OpenSSL configuration file.
+ utils.render(self.files.conf, 'openssl_config', self.config)
+
+ # Generate keys if they do not exist.
+ if not os.path.exists(self.files.public_key):
+ tmpdir = tempfile.mkdtemp()
+ csr_file = os.path.join(tmpdir, 'ca.csr')
+ log.info('creating new RSA CA CSR')
+ openssl_wrap.run_with_config(
+ self.files.conf, 'req', '-new',
+ '-passout', 'pass:%s' % self._getpw(),
+ '-keyout', self.files.private_key, '-out', csr_file)
+ log.info('self-signing RSA CA certificate')
+ openssl_wrap.run_with_config(
+ self.files.conf, 'ca', '-keyfile', self.files.private_key,
+ '-key', self._getpw(),
+ '-extensions', 'v3_ca', '-out', self.files.public_key,
+ '-days', self.config.get('days', self.config['default_days']),
+ '-selfsign', '-infiles', csr_file)
+ shutil.rmtree(tmpdir)
+
+ os.umask(old_umask)
+
+ # Make some files public.
+ for path in (os.path.join(self.basedir, 'public'),
+ os.path.join(self.basedir, 'public/certs'),
+ self.files.public_key):
+ if os.path.isdir(path):
+ os.chmod(path, 0755)
+ else:
+ os.chmod(path, 0644)
+
+ def gencrl(self):
+ log.info('generating CRL')
+ openssl_wrap.run_with_config(
+ self.files.conf, 'ca', '-gencrl', '-out', self.files.crl,
+ '-key', self._getpw())
+ os.chmod(self.files.crl, 0644)
+
+ def revoke(self, cert):
+ log.info('revoking certificate %s', cert.name)
+ openssl_wrap.run_with_config(
+ self.files.conf, 'ca', '-revoke', cert.public_key_file,
+ '-key', self._getpw())
+ self.gencrl()
+
+ def generate(self, cert):
+ expiry = cert.get_expiration_date()
+ if expiry and expiry > time.time():
+ log.warn('certificate is still valid, revoking previous version')
+ self.revoke(cert)
+
+ log.info('generating new certificate %s', cert.name)
+ tmpdir = tempfile.mkdtemp()
+ try:
+ csr_file = os.path.join(tmpdir, '%s.csr' % cert.name)
+ conf_file = os.path.join(tmpdir, '%s.conf' % cert.name)
+ ext_file = os.path.join(tmpdir, '%s-ext.conf' % cert.name)
+ conf = {}
+ conf.update(self.config)
+ conf['cn'] = cert.cn
+ conf['days'] = cert.days or self.config['default_days']
+ if cert.ou:
+ conf['ou'] = cert.ou
+ conf['alt_names'] = ''.join(
+ ['DNS.%d=%s\n' % (idx + 1, x)
+ for idx, x in enumerate(cert.alt_names)])
+ utils.render(conf_file, 'openssl_config', conf)
+ utils.render(ext_file, 'ext_config', conf)
+ openssl_wrap.run_with_config(
+ conf_file, 'req', '-new', '-keyout', cert.private_key_file,
+ '-nodes', '-out', csr_file)
+ os.chmod(cert.private_key_file, 0600)
+ openssl_wrap.run_with_config(
+ conf_file, 'ca', '-days', conf['days'],
+ '-key', self._getpw(),
+ '-policy', 'policy_anything', '-out', cert.public_key_file,
+ '-extfile', ext_file, '-infiles', csr_file)
+ finally:
+ shutil.rmtree(tmpdir)
+
--- /dev/null
+import os
+import re
+import string
+import time
+from cam import openssl_wrap
+
+
+def _parse_alt_names(s):
+ if not s:
+ return []
+ if ',' in s:
+ parts = s.split(',')
+ else:
+ parts = s.split()
+ return [x.strip() for x in parts if x]
+
+
+class Cert(object):
+
+ def __init__(self, ca, name, config):
+ self.name = name
+ self.ca = ca
+ self.cn = config['cn']
+ self.ou = config.get('ou', '')
+ self.days = config.get('days')
+
+ self.alt_names = _parse_alt_names(config.get('alt_names'))
+ if self.cn not in self.alt_names:
+ self.alt_names.insert(0, self.cn)
+ self.public_key_file = os.path.join(ca.basedir, 'public', 'certs',
+ '%s.pem' % name)
+ self.private_key_file = os.path.join(ca.basedir, 'private',
+ '%s.key' % name)
+
+ def get_fingerprint(self, digest='sha1'):
+ if os.path.exists(self.public_key_file):
+ output = openssl_wrap.run('x509', '-in', self.public_key_file,
+ '-noout', '-fingerprint', '-%s' % digest)
+ m = re.search(r'=(.*)$', output)
+ if m:
+ return m.group(1)
+ return None
+
+ def get_expiration_date(self):
+ if os.path.exists(self.public_key_file):
+ output = openssl_wrap.run('x509', '-in', self.public_key_file,
+ '-noout', '-dates')
+ m = re.search(r'notAfter=(.*)', output)
+ if m:
+ return time.mktime(time.strptime(m.group(1),
+ '%b %d %H:%M:%S %Y %Z'))
+ return None
+
+ def expired(self):
+ now = time.time()
+ return self.get_expiration_date() > now
+
+
--- /dev/null
+import ConfigParser
+import os
+from cam import cert
+from cam import ca
+
+
+class ConfigError(Exception):
+ pass
+
+
+def read_config(filename):
+ parser = ConfigParser.ConfigParser()
+ if not parser.read(filename):
+ raise ConfigError('File not found: %s' % filename)
+ root_dir = os.path.dirname(os.path.abspath(filename))
+ global_config = {}
+ if parser.has_section('global'):
+ global_config = dict(parser.items('global'))
+ root_dir = global_config.get('root_dir', root_dir)
+ ca_obj = ca.CA(root_dir, dict(parser.items('ca')))
+ certs = []
+ for section in parser.sections():
+ if section in ('ca', 'global'):
+ continue
+ certs.append(cert.Cert(ca_obj, section, dict(parser.items(section))))
+ return global_config, ca_obj, certs
--- /dev/null
+#!/usr/bin/python
+
+import logging
+import optparse
+import os
+import sys
+import time
+from cam import config
+
+
+USAGE = '''cam [<OPTIONS>] <COMMAND> [<ARG>...]
+CAM v2.0 - (c)2012 by <ale@incal.net>
+A Certification Authority manager for complex situations.
+
+Known commands:
+
+ init [<RSA_CRT> [<DSA_CRT>]]
+ Initialize the environment and create a new CA certificate
+ (you can also import your own existing certificates)
+
+ gen <TAG>...
+ Create (or re-create) the certificates corresponding
+ to TAG
+
+ gencrl
+ Update the CRL
+
+ list
+ List all known certificates
+
+ files <TAG>...
+ Dump all the certificate-related files of this TAG
+
+ check
+ Should be run weekly from a cron job to warn you if some
+ certificates are about to expire (controlled by the 'warning_days'
+ parameter in the 'global' section of the configuration)
+
+The configuration file consists of a ini-style file, with one 'ca'
+section that specifies global CA parameters, and more sections for
+each tag with certificate-specific information. See the examples for
+more details on how to write your own configuration.
+'''
+
+
+def find_cert(certs, name):
+ for c in certs:
+ if c.name == name:
+ return c
+ raise Exception('Certificate "%s" not found' % name)
+
+
+def main():
+ parser = optparse.OptionParser(usage=USAGE)
+ parser.add_option('-d', '--debug', dest='debug', help='Be verbose',
+ action='store_true')
+ parser.add_option('-c', '--config', dest='config', help='Config file')
+ opts, args = parser.parse_args()
+ if not opts.config:
+ parser.error('Must specify --config')
+ if len(args) < 1:
+ parser.error('Must specify a command')
+
+ logging.basicConfig()
+ logging.getLogger().setLevel(opts.debug and logging.DEBUG or logging.INFO)
+
+ global_config, ca, certs = config.read_config(opts.config)
+
+ cmd, args = args[0], args[1:]
+
+ try:
+ if cmd == 'init':
+ ca.create()
+ elif cmd == 'gen':
+ if len(args) != 1:
+ parser.error('Wrong number of arguments')
+ ca.generate(find_cert(certs, args[0]))
+ elif cmd == 'gencrl':
+ ca.gencrl()
+ elif cmd == 'files':
+ if len(args) != 1:
+ parser.error('Wrong number of arguments')
+ c = find_cert(certs, args[0])
+ print c.public_key_file
+ print c.private_key_file
+ elif cmd == 'list':
+ for cert in sorted(certs, key=lambda x: x.name):
+ print cert.name, cert.cn, cert.get_expiration_date()
+ elif cmd == 'check':
+ now = time.time()
+ warning_time = 8640000 * int(global_config.get('warning_days', 15))
+ for cert in certs:
+ exp = cert.get_expiration_date()
+ if exp and (exp - now) < warning_time:
+ print '%s (%s) is about to expire.' % (cert.name, cert.cn)
+ else:
+ parser.error('unknown command "%s"' % cmd)
+ finally:
+ ca.close()
+
+
+def main_wrapper():
+ try:
+ main()
+ return 0
+ except Exception, e:
+ logging.exception('uncaught exception')
+ return 1
+
+
+if __name__ == '__main__':
+ sys.exit(main_wrapper())
--- /dev/null
+import logging
+import subprocess
+
+log = logging.getLogger(__name__)
+
+
+class CommandError(Exception):
+ pass
+
+
+def run(*args):
+ cmd = ['openssl']
+ cmd.extend(args)
+ log.debug('executing "%s"' % ' '.join(cmd))
+ pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+ stdout, _ = pipe.communicate()
+ if pipe.returncode != 0:
+ raise CommandError('openssl exited with status %d' % (
+ pipe.returncode,))
+ return stdout
+
+
+def run_with_config(config_file, *args):
+ cmd = args[0]
+ args = args[1:]
+ return run(cmd, '-config', config_file, '-batch', *args)
--- /dev/null
+basicConstraints = CA:false
+nsCertType = client, server
+keyUsage = nonRepudiation, digitalSignature, keyEncipherment
+extendedKeyUsage = clientAuth, serverAuth
+nsComment = "%(cn)s"
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid, issuer:always
+subjectAltName = @subject_alt_name
+issuerAltName = issuer:copy
+nsCaRevocationUrl = %(crl_url)s
+nsRevocationUrl = %(crl_url)s
+crlDistributionPoints = @cdp_section
+
+[ subject_alt_name ]
+%(alt_names)s
+email = copy
+
+[ cdp_section ]
+URI.1 = %(crl_url)s
-
-openssl_conf_template = '''
-RANDFILE = %(ca_dir)s/.random
+RANDFILE = %(basedir)s/.random
[ ca ]
default_ca = CA_default
[ CA_default ]
-dir = %(ca_dir)s
+dir = %(basedir)s
certs = $dir/public/certs
crl_dir = $dir/public/crl
crl = $dir/public/crl.pem
+crlnumber = $dir/crlnumber
database = $dir/index
serial = $dir/serial
new_certs_dir = $dir/newcerts
emailAddress = optional
[ req ]
-default_bits = 4096
+default_bits = %(bits)s
default_md = sha1
distinguished_name = req_distinguished_name
attributes = req_attributes
subjectAltName = email:copy
issuerAltName = issuer:copy
-'''
-
-ext_template = '''
-basicConstraints = CA:false
-nsCertType = client, server
-keyUsage = nonRepudiation, digitalSignature, keyEncipherment
-extendedKeyUsage = clientAuth, serverAuth
-nsComment = "%(ca_name)s"
-subjectKeyIdentifier = hash
-authorityKeyIdentifier = keyid, issuer:always
-subjectAltName = @subject_alt_name
-issuerAltName = issuer:copy
-nsCaRevocationUrl = %(ca_base_url)s/crl.pem
-nsRevocationUrl = %(ca_base_url)s/crl.pem
-crlDistributionPoints = @cdp_section
-
-[ subject_alt_name ]
-%(alt_names)s
-email = copy
-
-[ cdp_section ]
-URI.1 = %(ca_base_url)s/crl.pem
-'''
-
--- /dev/null
+import logging
+import os
+import tempfile
+import shutil
+import unittest
+from cam import ca
+from cam import openssl_wrap
+
+
+logging.basicConfig(level=logging.DEBUG)
+
+
+class CertStub(object):
+
+ def __init__(self, name, cn, tmpdir):
+ self.name = name
+ self.cn = cn
+ self.alt_names = [cn]
+ self.ou = None
+ self.days = '365'
+ self.public_key_file = os.path.join(tmpdir, '%s.pub' % name)
+ self.private_key_file = os.path.join(tmpdir, '%s.priv' % name)
+
+ def get_expiration_date(self):
+ return 123456789
+
+
+class CATest(unittest.TestCase):
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ self.ca = ca.CA(self.tmpdir,
+ {'cn': 'test ca', 'org': 'test',
+ 'bits': '1024', 'email': 'test@test.com'},
+ password='testpw')
+
+ def tearDown(self):
+ self.ca.close()
+ shutil.rmtree(self.tmpdir)
+
+ def test_create(self):
+ self.ca.create()
+ self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'conf/ca.conf')))
+
+ def test_create_cert(self):
+ self.ca.create()
+ cert = CertStub('test', 'www.test.com', self.tmpdir)
+ self.ca.generate(cert)
+ self.assertTrue(os.path.exists(cert.public_key_file))
+ self.assertTrue(os.path.exists(cert.private_key_file))
+
+ def test_revoke(self):
+ self.ca.create()
+ cert = CertStub('test', 'www.test.com', self.tmpdir)
+ self.ca.generate(cert)
+ self.ca.revoke(cert)
+ self.assertTrue(os.path.exists(os.path.join(self.tmpdir, 'public/crl.pem')))
--- /dev/null
+import os
+import tempfile
+import time
+import shutil
+import unittest
+from cam import cert
+
+
+TEST_PEM = '''-----BEGIN CERTIFICATE-----
+MIICNDCCAaECEAKtZn5ORf5eV288mBle3cAwDQYJKoZIhvcNAQECBQAwXzELMAkG
+A1UEBhMCVVMxIDAeBgNVBAoTF1JTQSBEYXRhIFNlY3VyaXR5LCBJbmMuMS4wLAYD
+VQQLEyVTZWN1cmUgU2VydmVyIENlcnRpZmljYXRpb24gQXV0aG9yaXR5MB4XDTk0
+MTEwOTAwMDAwMFoXDTEwMDEwNzIzNTk1OVowXzELMAkGA1UEBhMCVVMxIDAeBgNV
+BAoTF1JTQSBEYXRhIFNlY3VyaXR5LCBJbmMuMS4wLAYDVQQLEyVTZWN1cmUgU2Vy
+dmVyIENlcnRpZmljYXRpb24gQXV0aG9yaXR5MIGbMA0GCSqGSIb3DQEBAQUAA4GJ
+ADCBhQJ+AJLOesGugz5aqomDV6wlAXYMra6OLDfO6zV4ZFQD5YRAUcm/jwjiioII
+0haGN1XpsSECrXZogZoFokvJSyVmIlZsiAeP94FZbYQHZXATcXY+m3dM41CJVphI
+uR2nKRoTLkoRWZweFdVJVCxzOmmCsZc5nG1wZ0jl3S3WyB57AgMBAAEwDQYJKoZI
+hvcNAQECBQADfgBl3X7hsuyw4jrg7HFGmhkRuNPHoLQDQCYCPgmc4RKz0Vr2N6W3
+YQO2WxZpO8ZECAyIUwxrl0nHPjXcbLm7qt9cuzovk2C2qUtN8iD3zV9/ZHuO3ABc
+1/p3yjkWWW8O6tO1g39NTUJWdrTJXwT4OPjr0l91X817/OWOgHz8UA==
+-----END CERTIFICATE-----
+'''
+
+TEST_SHA1 = '44:63:C5:31:D7:CC:C1:00:67:94:61:2B:B6:56:D3:BF:82:57:84:6F'
+TEST_MD5 = '74:7B:82:03:43:F0:00:9E:6B:B3:EC:47:BF:85:A5:93'
+TEST_EXPIRY = 1262908799.0
+
+
+class CAStub(object):
+
+ def __init__(self, basedir):
+ self.basedir = basedir
+
+
+class CertTest(unittest.TestCase):
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ os.makedirs(os.path.join(self.tmpdir, 'public', 'certs'))
+ self.ca = CAStub(self.tmpdir)
+ self.crt_file = os.path.join(self.tmpdir, 'public', 'certs', 'test.pem')
+ fd = open(self.crt_file, 'w')
+ fd.write(TEST_PEM)
+ fd.close()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def test_get_fingerprint(self):
+ crt = cert.Cert(self.ca, 'test', {'cn': 'test.com'})
+ md5 = crt.get_fingerprint('md5')
+ self.assertEquals(TEST_MD5, md5)
+ sha1 = crt.get_fingerprint('sha1')
+ self.assertEquals(TEST_SHA1, sha1)
+
+ def test_get_fingerprint_nonexist(self):
+ crt = cert.Cert(self.ca, 'test-nonexist', {'cn': 'test.com'})
+ result = crt.get_fingerprint('md5')
+ self.assertEquals(None, result)
+
+ def test_cn_in_alt_names(self):
+ crt = cert.Cert(self.ca, 'test', {'cn': 'test.com',
+ 'alt_names': 'test2.com'})
+ self.assert_('test.com' in crt.alt_names)
+
+ def test_get_expiration_date(self):
+ crt = cert.Cert(self.ca, 'test', {'cn': 'test.com'})
+ exp = crt.get_expiration_date()
+ self.assertEquals(TEST_EXPIRY, exp)
+
+ def test_get_expiration_date_nonexist(self):
+ crt = cert.Cert(self.ca, 'test-nonexist', {'cn': 'test.com'})
+ exp = crt.get_expiration_date()
+ self.assertEquals(None, exp)
+
+ def test_expired(self):
+ crt = cert.Cert(self.ca, 'test', {'cn': 'test.com'})
+ exp = crt.get_expiration_date()
+ now = time.time()
+ is_expired = (exp > now)
+ self.assertEquals(is_expired, crt.expired())
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import mox
+import os
+import tempfile
+import unittest
+from cam import config
+
+
+class ConfigTest(unittest.TestCase):
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+ self.mox = mox.Mox()
+
+ def tearDown(self):
+ self.mox.VerifyAll()
+ self.mox.UnsetStubs()
+ os.system("rm -fr '%s'" % self.tmpdir)
+
+ def test_read_config(self):
+ test_cfg = '''
+[global]
+root_dir = root
+
+[ca]
+something = else
+
+[cert1]
+cn = test.com
+'''
+ cf_file = os.path.join(self.tmpdir, 'config')
+ cf_fd = open(cf_file, 'w')
+ cf_fd.write(test_cfg)
+ cf_fd.close()
+ self.mox.StubOutWithMock(config.ca, 'CA', use_mock_anything=True)
+ config.ca.CA('root', {'something': 'else'}).AndReturn('ca')
+ self.mox.StubOutWithMock(config.cert, 'Cert', use_mock_anything=True)
+ config.cert.Cert('ca', 'cert1', {'cn': 'test.com'}).AndReturn('cert1')
+ self.mox.ReplayAll()
+
+ global_config, ca_obj, certs = config.read_config(cf_file)
+ self.assertEquals('ca', ca_obj)
+ self.assertEquals(['cert1'], certs)
+
+ def test_read_config_nonexist(self):
+ def f():
+ config.read_config('nonexist.conf')
+ self.assertRaises(config.ConfigError, f)
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import os
+import unittest
+import subprocess
+import mox
+from cam import openssl_wrap
+
+
+class PopenStub(object):
+
+ def __init__(self, stdout='', returncode=0):
+ self.stdout = stdout
+ self.returncode = returncode
+
+ def communicate(self):
+ return self.stdout, 'unused'
+
+
+class OpensslWrapTest(unittest.TestCase):
+
+ def setUp(self):
+ self.mox = mox.Mox()
+
+ def tearDown(self):
+ self.mox.VerifyAll()
+ self.mox.UnsetStubs()
+
+ def test_run(self):
+ self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True)
+ pipe_stub = PopenStub(stdout='output')
+ subprocess.Popen(['openssl', 'test'], stdout=subprocess.PIPE
+ ).AndReturn(pipe_stub)
+ self.mox.ReplayAll()
+ result = openssl_wrap.run('test')
+ self.assertEquals('output', result)
+
+ def test_run_fails(self):
+ self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True)
+ pipe_stub = PopenStub(returncode=1)
+ subprocess.Popen(['openssl', 'test'], stdout=subprocess.PIPE
+ ).AndReturn(pipe_stub)
+ self.mox.ReplayAll()
+ def r():
+ result = openssl_wrap.run('test')
+ self.assertRaises(openssl_wrap.CommandError, r)
+
+ def test_run_with_config(self):
+ self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True)
+ pipe_stub = PopenStub(stdout='output')
+ subprocess.Popen(['openssl', 'test', '-config', 'conf', '-batch', 'arg'],
+ stdout=subprocess.PIPE).AndReturn(pipe_stub)
+ self.mox.ReplayAll()
+ result = openssl_wrap.run_with_config('conf', 'test', 'arg')
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import os
+import unittest
+import tempfile
+from cam import utils
+
+
+class UtilsTest(unittest.TestCase):
+
+ def setUp(self):
+ self.old_template_dir = utils._template_dir
+ self.tmpdir = tempfile.mkdtemp()
+ utils._template_dir = self.tmpdir
+
+ def tearDown(self):
+ os.system('rm -fr "%s"' % self.tmpdir)
+ utils._template_dir = self.old_template_dir
+
+ def test_render(self):
+ tf = os.path.join(utils._template_dir, 'test')
+ tfd = open(tf, 'w')
+ tfd.write('this is a %(sub)s\n')
+ tfd.close()
+ of = os.path.join(self.tmpdir, 'test.out')
+ utils.render(of, 'test', {'sub':'TEST'})
+ self.assert_(os.path.exists(of))
+ output = open(of, 'r').read()
+ self.assertEquals('this is a TEST\n', output)
+
+ def test_parse_bool(self):
+ self.assertTrue(utils.parse_bool('y'))
+ self.assertTrue(utils.parse_bool('1'))
+ self.assertTrue(utils.parse_bool('true'))
+ self.assertFalse(utils.parse_bool('false'))
+ self.assertFalse(utils.parse_bool('no'))
+
+
+if __name__ == '__main__':
+ unittest.main()
--- /dev/null
+import os
+
+
+_template_dir = os.path.join(os.path.dirname(__file__), 'templates')
+
+
+def render(output_file, template_name, args):
+ tpl = open(os.path.join(_template_dir, template_name), 'r')
+ outfd = open(output_file, 'w')
+ outfd.write(tpl.read() % args)
+ tpl.close()
+ outfd.close()
+
+
+def parse_bool(s):
+ return s.lower() in ('1', 'y', 'yes', 'true', 'on')
+++ /dev/null
-
-[global]
-root = /src/cam/test
-
-[ca]
-name = Example Certification Authority
-org = Example
-country = AA
-default_days = 365
-email = ca@domain.org
-base_url = http://ca.domain.org/public/
-
-[web]
-cn = www.domain.org
-alt_names = www1.domain.org, www2.domain.org,
- www3.domain.org, www4.domain.org, www5.domain.org
-ou = Example web services
-
-[imap]
-cn = mail.domain.org
-alt_names = mail.domain.org, imap.domain.org, domain.org
-ou = Example mail services
-
-[smtp]
-cn = smtp.domain.org
-alt_names = mx1.domain.org, mx2.domain.org, mx3.domain.org,
- mx4.domain.org, mx5.domain.org, smtp.domain.org,
- mx.domain.org
-ou = Example mail services
-
+++ /dev/null
-
-__all__ = [ 'cfg', 'ca_base', 'ca', 'config_file_path' ]
-
-import os, sys
-import ConfigParser
-from utils import *
-
-config_file_path = os.path.join(os.path.dirname(sys.argv[0]),'config')
-if not os.path.exists(config_file_path):
- print '''
-The configuration file '%s' does not exist.
-Please create this file and fill in the configuration data
-of your certificates so that 'cam' can operate.
-''' % config_file_path
- sys.exit(1)
-
-cfg = ConfigParser.ConfigParser()
-cfg.read(config_file_path)
-ca = cfg2dict(cfg, 'ca')
-ca_base = cfg.get('global', 'root')
+++ /dev/null
-
-import os
-from time import time, strftime, localtime
-from utils import *
-from paths import *
-from cfg import *
-
-
-def check():
- now = time()
- warning_days = 15
- if cfg.has_option('global', 'warning_days'):
- warning_days = int(cfg.get('global', 'warning_days'))
- for tag in cfg.sections():
- if tag == 'global' or tag == 'ca' or tag == 'DEFAULT':
- continue
- crt_file = getpath('rsa_crt', tag)
- crt_date = getcertdate(crt_file)
- if crt_date:
- days = int((now - crt_date)/86400)
- if days > -warning_days:
- print '''
-The certificate '%s' should be renewed!
-(Expiration date: %s)
-''' % (tag, strftime('%d/%m/%Y', localtime(crt_date)))
-
+++ /dev/null
-
-import os
-from cfg import *
-from paths import *
-
-fields = [
- ('conf', 'OpenSSL configuration file'),
- ('rsa_key', 'RSA private key'),
- ('rsa_crt', 'RSA certificate'),
- ('dsa_key', 'DSA private key'),
- ('dsa_crt', 'DSA certificate'),
- ('dsa_parms', 'DSA parameters'),
- ('public_crt', 'public certificates bundle'),
- ('singlefile', 'single cert. file (with keys)')
- ]
-
-def files(tag):
- print '''
-
-Files related to the '%s' certificates:
-
-''' % tag
- for k, desc in fields:
- p = getpath(k, tag)
- star = ' '
- if not os.path.exists(p):
- star = '*'
- print '%-30s %s%s' % (desc, star, p)
-
- print '(* = not found)'
- print
-
+++ /dev/null
-
-
-import os, sys
-import logging
-from utils import *
-from templates import *
-from paths import *
-from cfg import *
-
-
-def gen(tag):
-
- info = cfg2dict(cfg, tag)
-
- conf_file = getpath('conf', tag)
- rsa_key_file = getpath('rsa_key', tag)
- dsa_key_file = getpath('dsa_key', tag)
- dsa_parms_file = getpath('dsa_parms', tag)
- csr_file = getpath('rsa_csr', tag)
- dsa_csr_file = getpath('dsa_csr', tag)
- ext_file = getpath('ext', tag)
- public_crt_file = getpath('public_crt', tag)
- crt_file = getpath('rsa_crt', tag)
- dsa_crt_file = getpath('dsa_crt', tag)
- sf_file = getpath('singlefile', tag)
-
- if os.path.exists(public_crt_file):
- print
- if expired(getcertdate(public_crt_file)):
- print 'Certificate has expired. Ready to re-generate.'
- else:
- print
- ans = raw_input('This certificate seems to exist already (in %s).\nAre you really sure that you want to re-create it? [y/N] ' % crt_file)
- if not ans or ans[0].lower() != 'y':
- sys.exit(0)
- print 'Revoking previous certificate...'
- openssl('ca', '-config', conf_file,
- '-revoke', public_crt_file)
-
-
- # create custom config file
- template(conf_file,
- openssl_conf_template,
- dict(
- ca_dir = ca_base,
- default_days = ca['default_days'],
- country = d2get(info, ca, 'country'),
- org = d2get(info, ca, 'org'),
- ou = d2get(info, ca, 'ou', ''),
- cn = info['cn'],
- email = d2get(info, ca, 'email')))
-
- # create dsa parameters
- openssl('dsaparam', '-out', dsa_parms_file, '1024')
-
- # create rsa key
- openssl('req', '-batch', '-new', '-keyout', rsa_key_file,
- '-config', conf_file, '-nodes', '-out', csr_file)
- openssl('req', '-batch', '-new', '-newkey', 'dsa:' + dsa_parms_file,
- '-keyout', dsa_key_file, '-nodes',
- '-config', conf_file, '-out', dsa_csr_file)
-
- # create ext file
- altnames = [ x.strip() for x in info['alt_names'].split(',') ]
- altnames_s = ''
- for i in range(len(altnames)):
- altnames_s += 'DNS.%d=%s\n' % (i + 1, altnames[i])
- template(ext_file,
- ext_template,
- dict(
- ca_name = ca['name'],
- ca_base_url = ca['base_url'],
- alt_names = altnames_s))
-
- # sign requests
- openssl('ca', '-days', ca['default_days'],
- '-config', conf_file, '-batch',
- '-policy', 'policy_anything',
- '-out', crt_file,
- '-extfile', ext_file,
- '-infiles', csr_file)
- openssl('ca', '-days', ca['default_days'],
- '-config', conf_file, '-batch',
- '-policy', 'policy_anything',
- '-out', dsa_crt_file,
- '-extfile', ext_file,
- '-infiles', dsa_csr_file)
- f = open(public_crt_file, 'w')
- f.write(open(crt_file, 'r').read())
- f.write(open(dsa_crt_file, 'r').read())
- f.close()
-
- # create single-file file
- f = open(sf_file, 'w')
- f.write(open(crt_file, 'r').read())
- f.write(open(dsa_crt_file, 'r').read())
- f.write(open(rsa_key_file, 'r').read())
- f.write(open(dsa_key_file, 'r').read())
- f.close()
-
- logging.info('created certificate %s [%s]' % (tag, info['cn']))
-
- print '''
-Certificate '%s':
-
- CN: %s
- AltNames: %s
-
- RSA key: %s
- DSA key: %s
- public crt: %s
- all-in-one: %s
-
-''' % (tag, info['cn'], ', '.join(altnames),
- rsa_key_file, dsa_key_file, public_crt_file, sf_file)
+++ /dev/null
-
-
-import os
-import logging
-from utils import *
-from cfg import *
-
-
-def initfs():
- mkdir(ca_base)
- for sub in [ 'archive', 'ext', 'conf',
- 'private', 'private/certs', 'private/single-file',
- 'public', 'public/certs', 'public/crl', 'newcerts' ]:
- mkdir(os.path.join(ca_base, sub))
- os.chmod(os.path.join(ca_base, 'private'), 0700)
- logging.info('created directory structure')
+++ /dev/null
-
-import os, re
-import commands
-from time import time, strftime, localtime
-from utils import *
-from templates import *
-from paths import *
-from cfg import *
-
-
-
-def list(args):
- verbose = 0
- if len(args) > 0 and args[0] == '-v':
- verbose = 1
-
- print '''
-
-List of known certificates for '%s' CA:
-
-''' % ca['name']
-
- i = 0
- sections = cfg.sections()
- sections.sort()
- for sec in sections:
- if sec == 'ca' or sec == 'global':
- continue
- i += 1
- crt_file = getpath('rsa_crt', sec)
- dsa_crt_file = getpath('dsa_crt', sec)
- if os.path.exists(crt_file):
- crt_date = getcertdate(crt_file)
- days = int((time() - crt_date) / 86400)
- if days > 0:
- days_s = '%d days ago' % days
- else:
- days_s = '%d days left' % (-days)
- if expired(crt_date):
- m = 'EXPIRED %s' % days_s.upper()
- star = '*'
- else:
- if verbose:
- m = '\n%21s Expiration: %s (%s)' % \
- (' ',
- strftime('%d %b %Y', localtime(crt_date)),
- days_s)
- else:
- m = 'exp. %s' % strftime('%d/%m/%Y',
- localtime(crt_date))
- star = ' '
- else:
- m = 'NOT FOUND'
- star = '*'
- print '%3d. %s%-15s %s [%s] - %s' % (i, star, sec, cfg.get(sec, 'cn'), cfg.get(sec, 'alt_names').replace('\n', ' '), m)
- if star != '*' and verbose:
- # do fingerprints
- for hname, hash in [ ('MD5', 'md5'), ('SHA1', 'sha1') ]:
- for cypher, file in [ ( 'RSA', crt_file), ( 'DSA', dsa_crt_file ) ]:
- fp = fingerprint(hash, file)
- print '%21s %s %s fingerprint: %s' % ('', cypher, hname, fp)
-
- if verbose:
- print
-
- print '''
-(* = certificate does not exist, create with 'cam gen <name>')
-'''
-
+++ /dev/null
-
-import os, logging
-from utils import *
-from templates import *
-from cfg import *
-
-
-def newca():
-
- conf_file = os.path.join(ca_base, 'conf/ca.conf')
- ca_file = os.path.join(ca_base, 'public/ca.pem')
- ca_dsa_file = os.path.join(ca_base, 'public/ca-dsa.tmp')
- ca_key_file = os.path.join(ca_base, 'private/ca.key')
- ca_dsa_key_file = os.path.join(ca_base, 'private/ca-dsa.key')
- ca_csr_file = os.path.join(ca_base, 'newcerts/ca.csr')
- ca_dsa_csr_file = os.path.join(ca_base, 'newcerts/ca-dsa.csr')
- dsa_parms_file = os.path.join(ca_base, 'private/ca.dsap')
-
- serial_file = os.path.join(ca_base, 'serial')
- index_file = os.path.join(ca_base, 'index')
- if not os.path.exists(serial_file):
- open(serial_file, 'w').write('01')
- if not os.path.exists(index_file):
- open(index_file, 'w').close()
-
- template(conf_file,
- openssl_conf_template,
- dict(
- ca_dir = ca_base,
- default_days = ca['default_days'],
- country = ca['country'],
- org = ca['org'],
- ou = ca.get('ou', ''),
- cn = ca['name'],
- email = ca['email']))
- if not os.path.exists(dsa_parms_file):
- openssl('dsaparam', '-out', dsa_parms_file, '1024')
- logging.info('generated CA DSA parameters')
- if not os.path.exists(ca_file):
- openssl('req', '-new', '-keyout', ca_key_file,
- '-config', conf_file, '-batch',
- '-out', ca_csr_file)
- openssl('req', '-new', '-newkey', 'dsa:' + dsa_parms_file,
- '-config', conf_file, '-batch',
- '-keyout', ca_dsa_key_file,
- '-out', ca_dsa_csr_file)
- openssl('ca',
- '-config', conf_file, '-batch',
- '-keyfile', ca_key_file,
- '-extensions', 'v3_ca',
- '-out', ca_file, '-selfsign',
- '-infiles', ca_csr_file)
- openssl('ca',
- '-config', conf_file, '-batch',
- '-keyfile', ca_dsa_key_file,
- '-extensions', 'v3_ca',
- '-out', ca_dsa_file, '-selfsign',
- '-infiles', ca_dsa_csr_file)
- open(ca_file, 'a').write(open(ca_dsa_file, 'r').read())
- os.remove(ca_dsa_file)
- logging.info('created CA certificates')
-
+++ /dev/null
-
-__all__ = [ 'getpath' ]
-
-import os
-from cfg import *
-
-
-path_exts = dict(
- conf = 'conf/%s.conf',
- rsa_key = 'private/%s.key',
- dsa_key = 'private/%s-dsa.key',
- dsa_parms = 'private/%s.dsap',
- rsa_csr = 'newcerts/%s.csr',
- dsa_csr = 'newcerts/%s-dsa.csr',
- rsa_crt = 'private/certs/%s.pem',
- dsa_crt = 'private/certs/%s-dsa.pem',
- ext = 'ext/%s.ext',
- public_crt = 'public/certs/%s.pem',
- singlefile = 'private/single-file/%s.pem',
- )
-
-def getpath(what, tag):
- return os.path.join(ca_base, path_exts[what] % tag)
-
-
+++ /dev/null
-
-__all__ = [
- 'getcertdate', 'fingerprint', 'expired',
- 'cfg2dict', 'mkdir', 'template',
- 'dictmerge', 'd2get',
- 'openssl'
- ]
-
-
-import commands
-import os, re
-import time
-
-
-def getcertdate(crt_file):
- if os.path.exists(crt_file):
- o = commands.getoutput('openssl x509 -in %s -noout -dates' % crt_file)
- m = re.search(r'notAfter=(.*)', o)
- if m:
- return time.mktime(time.strptime(m.group(1),
- '%b %d %H:%M:%S %Y %Z'))
- return 0
-
-def expired(crtdate):
- if crtdate < time.time():
- return 1
- else:
- return 0
-
-def fingerprint(hash, crt_file):
- if os.path.exists(crt_file):
- o = commands.getoutput('openssl x509 -in %s -noout -fingerprint -%s' % (crt_file, hash))
- m = re.search(r'=(.*)$', o)
- if m:
- return m.group(1)
- return 'NOT FOUND'
-
-def cfg2dict(cfg, section):
- d = dict()
- for k, v in cfg.items(section):
- d[k] = v
- return d
-
-def mkdir(p):
- try:
- os.mkdir(p, 0755)
- print 'created directory \'%s\'' % p
- except OSError:
- pass
-
-def template(file, t, args):
- f = open(file, 'w')
- f.write(t % args)
- f.close()
-
-def dictmerge(d1, d2):
- out = dict()
- for d in [ d2, d1 ]:
- for k, v in d.items():
- out[k] = v
- return out
-
-def d2get(d1, d2, k, default=None):
- return d1.get(k, d2.get(k, default))
-
-def openssl(*args):
- cmd = "openssl " + ' '.join(["'%s'" % x for x in args])
- print 'executing "%s"' % cmd
- return os.system(cmd)
-
-
-
--- /dev/null
+
+[ca]
+cn = Example Certification Authority
+org = Example
+country = AA
+default_days = 365
+email = ca@domain.org
+crl_url = http://ca.domain.org/ca.crl
+
+[web]
+cn = www.domain.org
+alt_names = www1.domain.org www2.domain.org
+ www3.domain.org www4.domain.org www5.domain.org
+ou = Example web services
+
+[imap]
+cn = mail.domain.org
+alt_names = mail.domain.org imap.domain.org domain.org
+ou = Example mail services
+
+[smtp]
+cn = smtp.domain.org
+alt_names = mx1.domain.org mx2.domain.org mx3.domain.org
+ mx4.domain.org mx5.domain.org smtp.domain.org
+ mx.domain.org
+ou = Example mail services
+
--- /dev/null
+[egg_info]
+tag_build = .dev
+
--- /dev/null
+#!/usr/bin/python
+
+from setuptools import setup, find_packages
+
+setup(
+ name="cam",
+ version="2.0",
+ description="X509 Certification Authority management",
+ author="ale",
+ author_email="ale@incal.net",
+ url="http://code.autistici.org/p/cam",
+ install_requires=[],
+ setup_requires=[],
+ zip_safe=False,
+ packages=find_packages(),
+ entry_points={
+ "console_scripts": [
+ "cam = cam.main:main",
+ ],
+ },
+ )
+