csr_file = os.path.join(tmpdir, 'ca.csr')
log.info('creating new RSA CA CSR')
openssl_wrap.run_with_config(
- self.files.conf, 'req', '-new',
+ self.basedir, self.files.conf,
+ 'req', '-new',
'-passout', 'pass:%s' % self._getpw(),
'-keyout', self.files.private_key, '-out', csr_file)
log.info('self-signing RSA CA certificate')
openssl_wrap.run_with_config(
- self.files.conf, 'ca', '-keyfile', self.files.private_key,
+ self.basedir, self.files.conf,
+ 'ca', '-keyfile', self.files.private_key,
'-key', self._getpw(),
'-extensions', 'v3_ca', '-out', self.files.public_key,
'-days', self.config.get('days', self.config['default_days']),
def gencrl(self):
log.info('generating CRL')
openssl_wrap.run_with_config(
- self.files.conf, 'ca', '-gencrl', '-out', self.files.crl,
+ self.basedir, self.files.conf,
+ 'ca', '-gencrl', '-out', self.files.crl,
'-key', self._getpw())
os.chmod(self.files.crl, 0644)
def revoke(self, cert):
log.info('revoking certificate %s', cert.name)
openssl_wrap.run_with_config(
- self.files.conf, 'ca', '-revoke', cert.public_key_file,
+ self.basedir, self.files.conf,
+ 'ca', '-revoke', cert.public_key_file,
'-key', self._getpw())
self.gencrl()
utils.render(conf_file, 'openssl_config', conf)
utils.render(ext_file, 'ext_config', conf)
openssl_wrap.run_with_config(
- conf_file, 'req', '-new', '-keyout', cert.private_key_file,
+ self.basedir, conf_file,
+ 'req', '-new', '-keyout', cert.private_key_file,
'-nodes', '-out', csr_file)
os.chmod(cert.private_key_file, 0600)
openssl_wrap.run_with_config(
- conf_file, 'ca', '-days', conf['days'],
+ self.basedir, conf_file,
+ 'ca', '-days', conf['days'],
'-key', self._getpw(),
'-policy', 'policy_anything', '-out', cert.public_key_file,
'-extfile', ext_file, '-infiles', csr_file)
import logging
+import os
import subprocess
log = logging.getLogger(__name__)
pass
-def run(*args):
+def run(*args, **env_vars):
cmd = ['openssl']
cmd.extend(args)
+ env = dict(os.environ)
+ env.update(env_vars)
log.debug('executing "%s"' % ' '.join(cmd))
- pipe = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+ pipe = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE)
stdout, _ = pipe.communicate()
if pipe.returncode != 0:
raise CommandError('openssl exited with status %d' % (
return stdout
-def run_with_config(config_file, *args):
+def run_with_config(caroot, config_file, *args):
cmd = args[0]
args = args[1:]
- return run(cmd, '-config', config_file, '-batch', *args)
+ caroot = os.path.abspath(caroot)
+ return run(cmd, '-config', config_file, '-batch', *args, CAROOT=caroot)
def test_run(self):
self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True)
pipe_stub = PopenStub(stdout='output')
- subprocess.Popen(['openssl', 'test'], stdout=subprocess.PIPE
+ subprocess.Popen(['openssl', 'test'], stdout=subprocess.PIPE, env=mox.IsA(dict)
).AndReturn(pipe_stub)
self.mox.ReplayAll()
result = openssl_wrap.run('test')
def test_run_fails(self):
self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True)
pipe_stub = PopenStub(returncode=1)
- subprocess.Popen(['openssl', 'test'], stdout=subprocess.PIPE
+ subprocess.Popen(['openssl', 'test'], stdout=subprocess.PIPE, env=mox.IsA(dict)
).AndReturn(pipe_stub)
self.mox.ReplayAll()
def r():
self.mox.StubOutWithMock(subprocess, 'Popen', use_mock_anything=True)
pipe_stub = PopenStub(stdout='output')
subprocess.Popen(['openssl', 'test', '-config', 'conf', '-batch', 'arg'],
- stdout=subprocess.PIPE).AndReturn(pipe_stub)
+ stdout=subprocess.PIPE, env=mox.IsA(dict)).AndReturn(pipe_stub)
self.mox.ReplayAll()
- result = openssl_wrap.run_with_config('conf', 'test', 'arg')
+ result = openssl_wrap.run_with_config('.', 'conf', 'test', 'arg')
if __name__ == '__main__':