import io
import os
import ssl
import shutil
import socket
import logging
import datetime
import collections
from typing import List, Tuple, Union
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.const as s_const
import synapse.lib.output as s_output
import synapse.lib.crypto.rsa as s_rsa
import cryptography.x509 as c_x509
import cryptography.hazmat.primitives.hashes as c_hashes
import cryptography.hazmat.primitives.asymmetric.rsa as c_rsa
import cryptography.hazmat.primitives.asymmetric.dsa as c_dsa
import cryptography.hazmat.primitives.asymmetric.types as c_types
import cryptography.hazmat.primitives.serialization as c_serialization
import cryptography.hazmat.primitives.serialization.pkcs12 as c_pkcs12
defdir_default = '~/.syn/certs'
defdir = os.getenv('SYN_CERT_DIR')
if defdir is None:
defdir = defdir_default
logger = logging.getLogger(__name__)
NSCERTTYPE_OID = '2.16.840.1.113730.1.1'
NSCERTTYPE_CLIENT = b'\x03\x02\x07\x80' # client
NSCERTTYPE_SERVER = b'\x03\x02\x06@' # server
NSCERTTYPE_OBJSIGN = b'\x03\x02\x04\x10' # objsign
TEN_YEARS = 10 * s_const.year # 10 years in milliseconds
TEN_YEARS_TD = datetime.timedelta(milliseconds=TEN_YEARS)
StrOrNone = Union[str | None]
BytesOrNone = Union[bytes | None]
OutPutOrNone = Union[s_output.OutPut | None]
CertOrNone = Union[c_x509.Certificate | None]
PkeyOrNone = Union[c_rsa.RSAPrivateKey | c_dsa.DSAPrivateKey | None]
PkeyAndCert = Tuple[c_rsa.RSAPrivateKey, c_x509.Certificate]
PkeyAndBuilder = Tuple[c_rsa.RSAPrivateKey, c_x509.CertificateBuilder]
PrivKeyPubKeyBuilder = Tuple[Union[c_rsa.RSAPrivateKey | None], c_types.PublicKeyTypes, c_x509.CertificateBuilder]
Pkey = Union[c_rsa.RSAPrivateKey | c_dsa.DSAPrivateKey]
Pkcs12OrNone = Union[c_pkcs12.PKCS12KeyAndCertificates | None]
PkeyOrNoneAndCert = Tuple[Union[c_rsa.RSAPrivateKey | None], c_x509.Certificate]
# Used for handling CSRs
PubKeyOrNone = Union[c_types.PublicKeyTypes | None]
[docs]
def iterFqdnUp(fqdn):
levs = fqdn.split('.')
for i in range(len(levs)):
yield '.'.join(levs[i:])
def _initTLSServerCiphers():
'''
Create a cipher string that supports TLS 1.2 and TLS 1.3 ciphers which do not use RSA.
Note:
The results of this may be dynamic depending on the interpreter version and OpenSSL library in use.
For Python 3.8 and below, the cipher list is a subset of the normal default ciphers which commonly available.
For Python 3.10+, the changes should be negligible.
The resulting string is cached in the module global TLS_SERVER_CIPHERS and called at import time.
Returns:
str: A OpenSSL Cipher string.
'''
ctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) # type: ssl.SSLContext
_ciphers = []
for cipher in ctx.get_ciphers(): # pragma: no cover
if cipher.get('protocol') not in ('TLSv1.2', 'TLSv1.3'):
continue
if cipher.get('kea') == 'kx-rsa': # pragma: no cover
continue
_ciphers.append(cipher)
if len(_ciphers) == 0: # pragma: no cover
raise s_exc.SynErr(mesg='No valid TLS ciphers are available for this Python installation.')
ciphers = ':'.join([c.get('name') for c in _ciphers])
return ciphers
TLS_SERVER_CIPHERS = _initTLSServerCiphers()
def _verifyCertSignature(cert, cacert):
'''
Verify that cert was directly issued and signed by cacert.
Only RSA keys are supported. This is consistent with the certdir
key generation policy (4096-bit RSA only). Raises BadCertVerify for
unsupported key types or invalid signatures. Uses the cryptography
library verify_directly_issued_by API which checks both the issuer
name match and the cryptographic signature.
'''
pubkey = cacert.public_key()
if not isinstance(pubkey, c_rsa.RSAPublicKey):
raise s_exc.BadCertVerify(mesg=f'Unsupported CA key type: {type(pubkey).__name__}')
cert.verify_directly_issued_by(cacert)
[docs]
class Crl:
def __init__(self, cdir, name):
self.name = name
self.certdir = cdir
self.path = self.certdir.genCrlPath(name)
self.crlbuilder = c_x509.CertificateRevocationListBuilder().issuer_name(c_x509.Name([
c_x509.NameAttribute(c_x509.NameOID.COMMON_NAME, name),
]))
if os.path.isfile(self.path):
with io.open(self.path, 'rb') as fd:
crl = c_x509.load_pem_x509_crl(fd.read())
for revc in crl:
self.crlbuilder = self.crlbuilder.add_revoked_certificate(revc)
[docs]
def revoke(self, cert: c_x509.Certificate) -> None:
'''
Revoke a certificate with the CRL.
Args:
cert: The certificate to revoke.
Returns:
None
'''
try:
self._verify(cert)
except s_exc.BadCertVerify as e:
raise s_exc.BadCertVerify(mesg=f'Failed to validate that certificate was signed by {self.name}') from e
now = datetime.datetime.now(datetime.UTC)
builder = c_x509.RevokedCertificateBuilder()
builder = builder.serial_number(cert.serial_number)
builder = builder.revocation_date(now)
builder = builder.add_extension(c_x509.CRLReason(c_x509.ReasonFlags.unspecified), critical=False)
revoked_cert = builder.build()
self.crlbuilder = self.crlbuilder.add_revoked_certificate(revoked_cert)
self._save(now)
def _verify(self, cert):
'''
Verify that the certificate was issued by the CA associated with this CRL.
Uses verify_directly_issued_by to check both issuer name and
cryptographic signature. Raises BadCertVerify on failure.
'''
cacert = self.certdir.getCaCert(self.name)
try:
_verifyCertSignature(cert, cacert)
except s_exc.BadCertVerify:
raise
except Exception as e:
raise s_exc.BadCertVerify(mesg=str(e)) from None
def _save(self, timestamp: [datetime.datetime | None] = None) -> None:
if timestamp is None:
timestamp = datetime.datetime.now(datetime.UTC)
self.crlbuilder = self.crlbuilder.last_update(timestamp)
# We have to have a next updated time; but we set it to be >= the lifespan of our certificates in general.
self.crlbuilder = self.crlbuilder.next_update(timestamp + TEN_YEARS_TD)
prvkey = self.certdir.getCaKey(self.name)
crl = self.crlbuilder.sign(private_key=prvkey, algorithm=c_hashes.SHA256())
with s_common.genfile(self.path) as fd:
fd.truncate(0)
fd.write(crl.public_bytes(c_serialization.Encoding.PEM))
[docs]
class CertDir:
'''
Certificate loading/generation/signing utilities.
Features:
* Locates and load certificates, keys, and certificate signing requests (CSRs).
* Generates keypairs for users, hosts, and certificate authorities (CAs), supports both signed and self-signed.
* Generates certificate signing requests (CSRs) for users, hosts, and certificate authorities (CAs).
* Signs certificate signing requests (CSRs).
* Generates PKCS#12 archives for use in browser.
Args:
path (str): Optional path which can override the default path directory.
Notes:
* All certificates will be loaded from and written to ~/.syn/certs by default. Set the environment variable
SYN_CERT_DIR to override.
* All certificate generation methods create 4096 bit RSA keypairs.
* All certificate signing methods use sha256 as the signature algorithm.
* CertDir does not currently support signing CA CSRs.
'''
def __init__(self, path: StrOrNone = None):
self.crypto_numbits = 4096
self.signing_digest = c_hashes.SHA256
self.certdirs = []
self.pathrefs = collections.defaultdict(int)
if path is None:
path = (defdir,)
if not isinstance(path, (list, tuple)):
path = (path,)
for p in path:
self.addCertPath(p)
[docs]
def addCertPath(self, *path: str):
fullpath = s_common.genpath(*path)
self.pathrefs[fullpath] += 1
if self.pathrefs[fullpath] == 1:
self.certdirs.append(fullpath)
[docs]
def delCertPath(self, *path: str):
fullpath = s_common.genpath(*path)
self.pathrefs[fullpath] -= 1
if self.pathrefs[fullpath] <= 0:
self.certdirs.remove(fullpath)
self.pathrefs.pop(fullpath, None)
[docs]
def genCaCert(self, name: str,
signas: StrOrNone = None,
outp: OutPutOrNone = None,
save: bool = True) -> PkeyAndCert:
'''
Generates a CA keypair.
Args:
name: The name of the CA keypair.
signas: The CA keypair to sign the new CA with.
outp: The output buffer.
save: Save the certificate and key to disk.
Examples:
Make a CA named "myca"::
mycakey, mycacert = cdir.genCaCert('myca')
Returns:
Tuple containing the private key and certificate objects.
'''
prvkey = self._genPrivKey()
builder = self._genCertBuilder(name, prvkey.public_key())
builder = builder.add_extension(
c_x509.BasicConstraints(ca=True, path_length=None), critical=False,
)
if signas is not None:
cert = self.signCertAs(builder, signas)
else:
cert = self.selfSignCert(builder, prvkey)
if save:
keypath = self._savePkeyTo(prvkey, 'cas', '%s.key' % name)
if outp is not None:
outp.printf('key saved: %s' % (keypath,))
crtpath = self._saveCertTo(cert, 'cas', '%s.crt' % name)
if outp is not None:
outp.printf('cert saved: %s' % (crtpath,))
return prvkey, cert
[docs]
def genHostCert(self, name: str,
signas: StrOrNone = None,
outp: OutPutOrNone = None,
csr: PubKeyOrNone = None,
sans: StrOrNone = None,
save: bool = True) -> PkeyOrNoneAndCert:
'''
Generates a host keypair.
Args:
name: The name of the host keypair.
signas: The CA keypair to sign the new host keypair with.
outp: The output buffer.
csr: The CSR public key when generating the keypair from a CSR.
sans: String of comma separated alternative names.
Examples:
Make a host keypair named "myhost"::
myhostkey, myhostcert = cdir.genHostCert('myhost')
Returns:
Tuple containing the private key and certificate objects. Private key may be None when signing a CSR.
'''
if csr is None:
prvkey = self._genPrivKey()
pubkey = prvkey.public_key()
else:
prvkey = None
pubkey = csr
builder = self._genCertBuilder(name, pubkey)
ext_sans = collections.defaultdict(set)
ext_sans['dns'].add(name)
sans_ctors = {'dns': c_x509.DNSName,
'email': c_x509.RFC822Name,
'uri': c_x509.UniformResourceIdentifier}
if sans:
sans = sans.split(',')
for san in sans:
if san.startswith('DNS:'):
san = san[4:]
ext_sans['dns'].add(san)
elif san.startswith('email:'):
san = san[6:]
ext_sans['email'].add(san)
elif san.startswith('URI:'):
san = san[4:]
ext_sans['uri'].add(san)
else:
raise s_exc.BadArg(mesg=f'Unsupported san value: {san}')
sans = []
for key, ctor in sans_ctors.items():
values = sorted(ext_sans[key])
for valu in values:
sans.append(ctor(valu))
builder = builder.add_extension(c_x509.UnrecognizedExtension(
oid=c_x509.ObjectIdentifier(NSCERTTYPE_OID), value=NSCERTTYPE_SERVER),
critical=False,
)
builder = builder.add_extension(
c_x509.KeyUsage(digital_signature=True, key_encipherment=True, data_encipherment=False,
key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False,
decipher_only=False, content_commitment=False),
critical=False,
)
builder = builder.add_extension(c_x509.ExtendedKeyUsage([c_x509.oid.ExtendedKeyUsageOID.SERVER_AUTH]),
critical=False)
builder = builder.add_extension(c_x509.BasicConstraints(ca=False, path_length=None), critical=False)
builder = builder.add_extension(c_x509.SubjectAlternativeName(sans), critical=False)
if signas is not None:
cert = self.signCertAs(builder, signas)
else:
cert = self.selfSignCert(builder, prvkey)
if save:
if prvkey is not None:
keypath = self._savePkeyTo(prvkey, 'hosts', '%s.key' % name)
if outp is not None:
outp.printf('key saved: %s' % (keypath,))
crtpath = self._saveCertTo(cert, 'hosts', '%s.crt' % name)
if outp is not None:
outp.printf('cert saved: %s' % (crtpath,))
return prvkey, cert
[docs]
def genHostCsr(self, name: str, outp: OutPutOrNone = None) -> bytes:
'''
Generates a host certificate signing request.
Args:
name: The name of the host CSR.
outp: The output buffer.
Examples:
Generate a CSR for the host key named "myhost"::
cdir.genHostCsr('myhost')
Returns:
The bytes of the CSR.
'''
return self._genPkeyCsr(name, 'hosts', outp=outp)
[docs]
def delHostCsr(self, name: str, outp: OutPutOrNone = None) -> bool:
'''
Delete an existing host CSR.
Args:
name: The name of the host CSR.
outp: The output buffer.
Returns:
bool: True if the CSR is deleted, False if it did not exist.
'''
path = self.getHostCsrPath(name)
if path is None:
return False
try:
os.unlink(path)
except Exception as e: # pragma: no cover
raise s_exc.SynErr(mesg=f'Failed to delete CSR {path} - {e}') from e
if outp:
outp.printf(f'Deleted CSR at {path}')
return True
[docs]
def genUserCert(self,
name: str,
signas: StrOrNone = None,
outp: OutPutOrNone = None,
csr: PubKeyOrNone = None,
save: bool = True) -> PkeyOrNoneAndCert:
'''
Generates a user keypair.
Args:
name: The name of the user keypair.
signas: The CA keypair to sign the new user keypair with.
outp: The output buffer.
csr: The CSR public key when generating the keypair from a CSR.
Examples:
Generate a user cert for the user "myuser"::
myuserkey, myusercert = cdir.genUserCert('myuser')
Returns:
Tuple containing the key and certificate objects.
'''
if csr is None:
prvkey = self._genPrivKey()
pubkey = prvkey.public_key()
else:
prvkey = None
pubkey = csr
builder = self._genCertBuilder(name, pubkey)
builder = builder.add_extension(c_x509.UnrecognizedExtension(
oid=c_x509.ObjectIdentifier(NSCERTTYPE_OID), value=NSCERTTYPE_CLIENT),
critical=False,
)
builder = builder.add_extension(
c_x509.KeyUsage(digital_signature=True, key_encipherment=False, data_encipherment=False,
key_agreement=False,
key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False,
content_commitment=False),
critical=False,
)
builder = builder.add_extension(c_x509.ExtendedKeyUsage([c_x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
critical=False)
builder = builder.add_extension(c_x509.BasicConstraints(ca=False, path_length=None), critical=False)
if signas is not None:
cert = self.signCertAs(builder, signas)
else:
cert = self.selfSignCert(builder, prvkey)
if save:
if prvkey is not None:
keypath = self._savePkeyTo(prvkey, 'users', '%s.key' % name)
if outp is not None:
outp.printf('key saved: %s' % (keypath,))
crtpath = self._saveCertTo(cert, 'users', '%s.crt' % name)
if outp is not None:
outp.printf('cert saved: %s' % (crtpath,))
return prvkey, cert
[docs]
def genCodeCert(self, name: str, signas: StrOrNone = None, outp: OutPutOrNone = None, save: bool = True) \
-> PkeyAndCert:
'''
Generates a code signing keypair.
Args:
name: The name of the code signing cert.
signas: The CA keypair to sign the new code keypair with.
outp: The output buffer.
Examples:
Generate a code signing cert for the name "The Vertex Project"::
myuserkey, myusercert = cdir.genCodeCert('The Vertex Project')
Returns:
Tuple containing the key and certificate objects.
'''
prvkey = self._genPrivKey()
pubkey = prvkey.public_key()
builder = self._genCertBuilder(name, pubkey)
builder = builder.add_extension(c_x509.UnrecognizedExtension(
oid=c_x509.ObjectIdentifier(NSCERTTYPE_OID), value=NSCERTTYPE_OBJSIGN),
critical=False,
)
builder = builder.add_extension(
c_x509.KeyUsage(digital_signature=True, key_encipherment=False, data_encipherment=False,
key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False,
decipher_only=False, content_commitment=False),
critical=False,
)
builder = builder.add_extension(c_x509.ExtendedKeyUsage([c_x509.oid.ExtendedKeyUsageOID.CODE_SIGNING]),
critical=False)
builder = builder.add_extension(c_x509.BasicConstraints(ca=False, path_length=None), critical=False)
if signas is not None:
cert = self.signCertAs(builder, signas)
else:
cert = self.selfSignCert(builder, prvkey)
if save:
keypath = self._savePkeyTo(prvkey, 'code', '%s.key' % name)
if outp is not None:
outp.printf('key saved: %s' % (keypath,))
crtpath = self._saveCertTo(cert, 'code', '%s.crt' % name)
if outp is not None:
outp.printf('cert saved: %s' % (crtpath,))
return prvkey, cert
[docs]
def getCodeKeyPath(self, name: str) -> StrOrNone:
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'code', f'{name}.key')
if os.path.isfile(path):
return path
[docs]
def getCodeCertPath(self, name: str) -> StrOrNone:
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'code', f'{name}.crt')
if os.path.isfile(path):
return path
[docs]
def getCodeKey(self, name: str) -> Union[s_rsa.PriKey | None]:
path = self.getCodeKeyPath(name)
if path is None:
return None
pkey = self._loadKeyPath(path)
return s_rsa.PriKey(pkey)
[docs]
def getCodeCert(self, name: str) -> CertOrNone:
path = self.getCodeCertPath(name)
if path is None: # pragma: no cover
return None
return self._loadCertPath(path)
[docs]
def valCodeCert(self, byts: bytes) -> c_x509.Certificate:
'''
Verify a code cert is valid according to certdir's available CAs and CRLs.
Args:
byts: The certificate bytes.
Raises:
s_exc.BadCertVerify if we are unable to verify the certificate.
Returns:
The certificate.
'''
reqext = c_x509.ExtendedKeyUsage([c_x509.oid.ExtendedKeyUsageOID.CODE_SIGNING])
cert = self.loadCertByts(byts)
try:
eku = cert.extensions.get_extension_for_oid(c_x509.oid.ExtensionOID.EXTENDED_KEY_USAGE)
except c_x509.ExtensionNotFound:
raise s_exc.BadCertBytes(mesg='Certificate is not for code signing.') from None
if reqext != eku.value:
mesg = 'Certificate is not for code signing.'
raise s_exc.BadCertBytes(mesg=mesg)
crls = self._getCaCrls()
cacerts = self.getCaCerts()
self._verifyChain(cert, cacerts, crls)
return cert
def _getCaCrls(self) -> List[c_x509.CertificateRevocationList]:
crls = []
for cdir in self.certdirs:
crlpath = os.path.join(cdir, 'crls')
if not os.path.isdir(crlpath):
continue
for name in os.listdir(crlpath):
if not name.endswith('.crl'): # pragma: no cover
continue
fullpath = os.path.join(crlpath, name)
with io.open(fullpath, 'rb') as fd:
crl = c_x509.load_pem_x509_crl(fd.read())
crls.append(crl)
return crls
def _verifyChain(self, cert, cacerts, crls=None, _seen=None, _imm_depth=0, _max_depth=8):
'''
Recursively verify a certificate chain from cert up to a self-signed root.
For each certificate in the chain this method verifies the issuer
signature, checks BasicConstraints and path length limits per RFC 5280,
validates the certificate time window, and checks CRLs for revocation.
CRL checking is lenient: if no CRL exists for a given issuer the
revocation check is skipped for that certificate. This allows
independent CA chains to coexist in the same certdir when only some
chains have CRLs.
Args:
cert: The certificate to verify.
cacerts: List of trusted CA certificates.
crls: Optional list of CRLs to check for revocation.
_seen: Internal set of visited certificate fingerprints for cycle detection.
_imm_depth: Internal counter tracking the accumulated CA depth for
path length constraint enforcement.
_max_depth: Maximum allowed chain depth (default 8).
'''
if _seen is None:
_seen = set()
if len(_seen) >= _max_depth:
raise s_exc.BadCertVerify(mesg='Certificate chain exceeds maximum depth')
certfp = cert.fingerprint(c_hashes.SHA256())
if certfp in _seen:
raise s_exc.BadCertVerify(mesg='Certificate chain cycle detected')
_seen.add(certfp)
issuercert = None
for cacert in cacerts:
try:
_verifyCertSignature(cert, cacert)
issuercert = cacert
break
except Exception:
continue
if issuercert is None:
raise s_exc.BadCertVerify(mesg='unable to get local issuer certificate')
handled_oids = {
c_x509.oid.ExtensionOID.BASIC_CONSTRAINTS,
c_x509.oid.ExtensionOID.EXTENDED_KEY_USAGE,
}
for chkcert in (cert, issuercert):
for ext in chkcert.extensions:
if ext.critical and ext.oid not in handled_oids:
mesg = f'Unhandled critical extension {ext} on {chkcert.subject}'
raise s_exc.BadCertVerify(mesg=mesg)
try:
issuer_bc = issuercert.extensions.get_extension_for_oid(c_x509.oid.ExtensionOID.BASIC_CONSTRAINTS)
if not issuer_bc.value.ca:
raise s_exc.BadCertVerify(mesg=f'Issuer certificate is not a CA certificate {issuercert.subject}')
except c_x509.ExtensionNotFound:
mesg = f'Issuer certificate is missing BasicConstraints extension on {issuercert.subject}'
raise s_exc.BadCertVerify(mesg=mesg) from None
try:
cert_bc = cert.extensions.get_extension_for_oid(c_x509.oid.ExtensionOID.BASIC_CONSTRAINTS)
except c_x509.ExtensionNotFound:
mesg = f'Certificate is missing BasicConstraints extension on {cert.subject}'
raise s_exc.BadCertVerify(mesg=mesg) from None
cert_is_ca = cert_bc.value.ca
ca_depth = _imm_depth + (1 if cert_is_ca else 0)
if issuer_bc.value.path_length is not None and ca_depth > issuer_bc.value.path_length:
mesg = f'Certificate chain exceeds issuer path length constraint on {issuercert.subject}'
raise s_exc.BadCertVerify(mesg=mesg)
now = datetime.datetime.now(datetime.UTC)
if now > cert.not_valid_after_utc:
raise s_exc.BadCertVerify(mesg=f'certificate has expired for {cert.subject}')
if now < cert.not_valid_before_utc:
raise s_exc.BadCertVerify(mesg=f'certificate is not yet valid for {cert.subject}')
if now > issuercert.not_valid_after_utc:
raise s_exc.BadCertVerify(mesg=f'issuer certificate has expired for {issuercert.subject}')
if now < issuercert.not_valid_before_utc:
raise s_exc.BadCertVerify(mesg=f'issuer certificate is not yet valid for {issuercert.subject}')
if crls:
for crl in crls:
if crl.issuer != issuercert.subject:
continue
if crl.get_revoked_certificate_by_serial_number(cert.serial_number) is not None:
raise s_exc.BadCertVerify(mesg='certificate revoked')
if issuercert.issuer != issuercert.subject:
self._verifyChain(issuercert, cacerts, crls, _seen, ca_depth, _max_depth)
[docs]
def genClientCert(self, name: str, outp: OutPutOrNone = None) -> None:
'''
Generates a user PKCS #12 archive.
Please note that the resulting file will contain private key material.
Args:
name (str): The name of the user keypair.
outp (synapse.lib.output.Output): The output buffer.
Examples:
Make the PKC12 object for user "myuser"::
myuserpkcs12 = cdir.genClientCert('myuser')
Returns:
None
'''
ucert = self.getUserCert(name)
if not ucert:
raise s_exc.NoSuchFile(mesg='missing User cert', name=name)
capath = self._getCaPath(ucert)
cacert = self._loadCertPath(capath)
if not cacert:
raise s_exc.NoSuchFile(mesg='missing CA cert', path=capath)
ukey = self.getUserKey(name)
if not ukey:
raise s_exc.NoSuchFile(mesg='missing User private key', name=name)
byts = c_pkcs12.serialize_key_and_certificates(name=name.encode('utf-8'),
key=ukey,
cert=ucert,
cas=[cacert],
encryption_algorithm=c_serialization.NoEncryption())
crtpath = self._saveP12To(byts, 'users', '%s.p12' % name)
if outp is not None:
outp.printf('client cert saved: %s' % (crtpath,))
[docs]
def genUserCsr(self, name: str, outp: OutPutOrNone = None) -> bytes:
'''
Generates a user certificate signing request.
Args:
name: The name of the user CSR.
outp: The output buffer.
Examples:
Generate a CSR for the user "myuser"::
cdir.genUserCsr('myuser')
Returns:
The bytes of the CSR.
'''
return self._genPkeyCsr(name, 'users', outp=outp)
[docs]
def delUserCsr(self, name: str, outp: OutPutOrNone = None) -> bool:
'''
Delete an existing user CSR.
Args:
name: The name of the user CSR.
outp: The output buffer.
Returns:
bool: True if the CSR is deleted, False if it did not exist.
'''
path = self.getUserCsrPath(name)
if path is None:
return False
try:
os.unlink(path)
except Exception as e: # pragma: no cover
raise s_exc.SynErr(mesg=f'Failed to delete CSR {path} - {e}') from e
if outp:
outp.printf(f'Deleted CSR at {path}')
return True
[docs]
def getCaCert(self, name: str) -> CertOrNone:
'''
Loads the X509 object for a given CA.
Args:
name: The name of the CA keypair.
Examples:
Get the certificate for the CA "myca"::
mycacert = cdir.getCaCert('myca')
Returns:
The certificate, if exists.
'''
return self._loadCertPath(self.getCaCertPath(name))
[docs]
def getCaCertBytes(self, name: str) -> bytes:
path = self.getCaCertPath(name)
if os.path.exists(path):
with open(path, 'rb') as fd:
return fd.read()
[docs]
def getCaCerts(self) -> List[c_x509.Certificate]:
'''
Return a list of CA certs from the CertDir.
Returns:
List of CA certificates.
'''
retn = []
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'cas')
if not os.path.isdir(path):
continue
for name in os.listdir(path):
if not name.endswith('.crt'): # pragma: no cover
continue
full = s_common.genpath(cdir, 'cas', name)
retn.append(self._loadCertPath(full))
return retn
[docs]
def getCaCertPath(self, name: str) -> StrOrNone:
'''
Gets the path to a CA certificate.
Args:
name: The name of the CA keypair.
Examples:
Get the path to the CA certificate for the CA "myca"::
mypath = cdir.getCACertPath('myca')
Returns:
The path, if exists.
'''
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'cas', '%s.crt' % name)
if os.path.isfile(path):
return path
[docs]
def getCaKey(self, name) -> PkeyOrNone:
'''
Loads the PKey object for a given CA keypair.
Args:
name: The name of the CA keypair.
Examples:
Get the private key for the CA "myca"::
mycakey = cdir.getCaKey('myca')
Returns:
The private key, if exists.
'''
return self._loadKeyPath(self.getCaKeyPath(name))
[docs]
def getCaKeyPath(self, name: str) -> StrOrNone:
'''
Gets the path to a CA key.
Args:
name: The name of the CA keypair.
Examples:
Get the path to the private key for the CA "myca"::
mypath = cdir.getCAKeyPath('myca')
Returns:
The path, if exists.
'''
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'cas', '%s.key' % name)
if os.path.isfile(path):
return path
[docs]
def getClientCert(self, name: str) -> Pkcs12OrNone:
'''
Loads the PKCS12 archive object for a given user keypair.
Args:
name: The name of the user keypair.
Examples:
Get the PKCS12 object for the user "myuser"::
mypkcs12 = cdir.getClientCert('myuser')
Notes:
The PKCS12 archive will contain private key material if it was created with CertDir or the easycert tool
Returns:
The PKCS12 archive, if exists.
'''
return self._loadP12Path(self.getClientCertPath(name))
[docs]
def getClientCertPath(self, name: str) -> StrOrNone:
'''
Gets the path to a client certificate.
Args:
name: The name of the client keypair.
Examples:
Get the path to the client certificate for "myuser"::
mypath = cdir.getClientCertPath('myuser')
Returns:
The path, if exists.
'''
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'users', '%s.p12' % name)
if os.path.isfile(path):
return path
[docs]
def getHostCaPath(self, name: str) -> StrOrNone:
'''
Gets the path to the CA certificate that issued a given host keypair.
Args:
name: The name of the host keypair.
Examples:
Get the path to the CA cert which issue the cert for "myhost"::
mypath = cdir.getHostCaPath('myhost')
Returns:
The path, if exists.
'''
cert = self.getHostCert(name)
if cert is None:
return None
return self._getCaPath(cert)
[docs]
def getHostCert(self, name: str) -> CertOrNone:
'''
Loads the X509 object for a given host keypair.
Args:
name: The name of the host keypair.
Examples:
Get the certificate object for the host "myhost"::
myhostcert = cdir.getHostCert('myhost')
Returns:
The certificate, if exists.
'''
return self._loadCertPath(self.getHostCertPath(name))
[docs]
def getHostCertHash(self, name: str) -> StrOrNone:
cert = self.getHostCert(name)
if cert is None:
return None
return s_common.ehex(cert.fingerprint(c_hashes.SHA256()))
[docs]
def getHostCertPath(self, name: str) -> StrOrNone:
'''
Gets the path to a host certificate.
Args:
name: The name of the host keypair.
Examples:
Get the path to the host certificate for the host "myhost"::
mypath = cdir.getHostCertPath('myhost')
Returns:
The path, if exists.
'''
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'hosts', '%s.crt' % name)
if os.path.isfile(path):
return path
[docs]
def getHostKey(self, name: str) -> PkeyOrNone:
'''
Loads the PKey object for a given host keypair.
Args:
name: The name of the host keypair.
Examples:
Get the private key object for the host "myhost"::
myhostkey = cdir.getHostKey('myhost')
Returns:
The private key, if exists.
'''
return self._loadKeyPath(self.getHostKeyPath(name))
[docs]
def getHostKeyPath(self, name: str) -> StrOrNone:
'''
Gets the path to a host key.
Args:
name: The name of the host keypair.
Examples:
Get the path to the host key for the host "myhost"::
mypath = cdir.getHostKeyPath('myhost')
Returns:
str: The path if exists.
'''
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'hosts', '%s.key' % name)
if os.path.isfile(path):
return path
[docs]
def getUserCaPath(self, name: str) -> StrOrNone:
'''
Gets the path to the CA certificate that issued a given user keypair.
Args:
name: The name of the user keypair.
Examples:
Get the path to the CA cert which issue the cert for "myuser"::
mypath = cdir.getUserCaPath('myuser')
Returns:
The path, if exists.
'''
cert = self.getUserCert(name)
if cert is None:
return None
return self._getCaPath(cert)
[docs]
def getUserCert(self, name: str) -> CertOrNone:
'''
Loads the X509 object for a given user keypair.
Args:
name: The name of the user keypair.
Examples:
Get the certificate object for the user "myuser"::
myusercert = cdir.getUserCert('myuser')
Returns:
The certificate, if exists.
'''
return self._loadCertPath(self.getUserCertPath(name))
[docs]
def getUserCertPath(self, name: str) -> StrOrNone:
'''
Gets the path to a user certificate.
Args:
name (str): The name of the user keypair.
Examples:
Get the path for the user cert for "myuser"::
mypath = cdir.getUserCertPath('myuser')
Returns:
The path, if exists.
'''
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'users', '%s.crt' % name)
if os.path.isfile(path):
return path
[docs]
def getUserForHost(self, user: str, host: str) -> StrOrNone:
'''
Gets the name of the first existing user cert for a given user and host.
Args:
user: The name of the user.
host: The name of the host.
Examples:
Get the name for the "myuser" user cert at "cool.vertex.link"::
usercertname = cdir.getUserForHost('myuser', 'cool.vertex.link')
Returns:
str: The cert name, if exists.
'''
for name in iterFqdnUp(host):
usercert = '%s@%s' % (user, name)
if self.isUserCert(usercert):
return usercert
[docs]
def getUserKey(self, name: str) -> PkeyOrNone:
'''
Loads the PKey object for a given user keypair.
Args:
name: The name of the user keypair.
Examples:
Get the key object for the user key for "myuser"::
myuserkey = cdir.getUserKey('myuser')
Returns:
The private key, if exists.
'''
return self._loadKeyPath(self.getUserKeyPath(name))
[docs]
def getUserKeyPath(self, name: str) -> StrOrNone:
'''
Gets the path to a user key.
Args:
name: The name of the user keypair.
Examples:
Get the path to the user key for "myuser"::
mypath = cdir.getUserKeyPath('myuser')
Returns:
The path, if exists.
'''
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'users', '%s.key' % name)
if os.path.isfile(path):
return path
[docs]
def getUserCsrPath(self, name: str) -> StrOrNone:
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'users', '%s.csr' % name)
if os.path.isfile(path):
return path
[docs]
def getHostCsrPath(self, name: str) -> StrOrNone:
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'hosts', '%s.csr' % name)
if os.path.isfile(path):
return path
[docs]
def importFile(self, path: str, mode: str, outp: OutPutOrNone = None) -> None:
'''
Imports certs and keys into the Synapse cert directory
Args:
path: The path of the file to be imported.
mode: The certdir subdirectory to import the file into.
Examples:
Import CA certifciate 'mycoolca.crt' to the 'cas' directory::
certdir.importFile('mycoolca.crt', 'cas')
Notes:
importFile does not perform any validation on the files it imports.
Returns:
None
'''
if not os.path.isfile(path):
raise s_exc.NoSuchFile(mesg=f'File {path} does not exist', path=path)
fname = os.path.split(path)[1]
parts = fname.rsplit('.', 1)
ext = parts[1] if len(parts) == 2 else None
if not ext or ext not in ('crt', 'key', 'p12'):
mesg = 'importFile only supports .crt, .key, .p12 extensions'
raise s_exc.BadFileExt(mesg=mesg, ext=ext)
newpath = s_common.genpath(self.certdirs[0], mode, fname)
if os.path.isfile(newpath):
raise s_exc.FileExists(mesg=f'File {newpath} already exists', path=path)
s_common.gendir(os.path.dirname(newpath))
shutil.copy(path, newpath)
if outp is not None:
outp.printf('copied %s to %s' % (path, newpath))
[docs]
def isCaCert(self, name: str) -> bool:
'''
Checks if a CA certificate exists.
Args:
name: The name of the CA keypair.
Examples:
Check if the CA certificate for "myca" exists::
exists = cdir.isCaCert('myca')
Returns:
True if the certificate is present, False otherwise.
'''
return self.getCaCertPath(name) is not None
[docs]
def isClientCert(self, name: str) -> bool:
'''
Checks if a user client certificate (PKCS12) exists.
Args:
name: The name of the user keypair.
Examples:
Check if the client certificate "myuser" exists::
exists = cdir.isClientCert('myuser')
Returns:
True if the certificate is present, False otherwise.
'''
crtpath = self._getPathJoin('users', '%s.p12' % name)
return os.path.isfile(crtpath)
[docs]
def isHostCert(self, name: str) -> bool:
'''
Checks if a host certificate exists.
Args:
name: The name of the host keypair.
Examples:
Check if the host cert "myhost" exists::
exists = cdir.isUserCert('myhost')
Returns:
True if the certificate is present, False otherwise.
'''
return self.getHostCertPath(name) is not None
[docs]
def isUserCert(self, name: str) -> bool:
'''
Checks if a user certificate exists.
Args:
name: The name of the user keypair.
Examples:
Check if the user cert "myuser" exists::
exists = cdir.isUserCert('myuser')
Returns:
True if the certificate is present, False otherwise.
'''
return self.getUserCertPath(name) is not None
[docs]
def isCodeCert(self, name: str) -> bool:
'''
Checks if a code certificate exists.
Args:
name: The name of the code keypair.
Examples:
Check if the code cert "mypipeline" exists::
exists = cdir.isCodeCert('mypipeline')
Returns:
True if the certificate is present, False otherwise.
'''
return self.getCodeCert(name) is not None
[docs]
def signCertAs(self, builder: c_x509.CertificateBuilder, signas: str) -> c_x509.Certificate:
'''
Signs a certificate with a CA keypair.
Args:
cert: The certificate to sign.
signas: The CA keypair name to sign the new keypair with.
Examples:
Sign a certificate with the CA "myca"::
cdir.signCertAs(mycert, 'myca')
Returns:
None
'''
cakey = self.getCaKey(signas)
if cakey is None:
raise s_exc.NoCertKey(mesg=f'Missing .key for {signas}')
cacert = self.getCaCert(signas)
if cacert is None:
raise s_exc.NoCertKey(mesg=f'Missing .crt for {signas}')
attr = cacert.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0]
name = attr.value
builder = builder.issuer_name(c_x509.Name([
c_x509.NameAttribute(c_x509.NameOID.COMMON_NAME, name),
]))
certificate = builder.sign(
private_key=cakey, algorithm=self.signing_digest(),
)
return certificate
[docs]
def signHostCsr(self, xcsr: c_x509.CertificateSigningRequest,
signas: str,
outp: OutPutOrNone = None,
sans: StrOrNone = None,
save: bool = True) -> PkeyOrNoneAndCert:
'''
Signs a host CSR with a CA keypair.
Args:
xcsr: The certificate signing request.
signas: The CA keypair name to sign the CSR with.
outp: The output buffer.
sans: List of subject alternative names.
Examples:
Sign a host key with the CA "myca"::
cdir.signHostCsr(mycsr, 'myca')
Returns:
Tuple containing the public key and certificate objects.
'''
pkey = xcsr.public_key()
attr = xcsr.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0]
name = attr.value
return self.genHostCert(name, csr=pkey, signas=signas, outp=outp, sans=sans, save=save)
[docs]
def selfSignCert(self, builder: c_x509.CertificateBuilder, pkey: Pkey) -> c_x509.Certificate:
'''
Self-sign a certificate.
Args:
cert: The certificate to sign.
pkey: The PKey with which to sign the certificate.
Examples:
Sign a given certificate with a given private key::
cdir.selfSignCert(mycert, myotherprivatekey)
Returns:
None
'''
attr = builder._subject_name.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0]
name = attr.value
builder = builder.issuer_name(c_x509.Name([
c_x509.NameAttribute(c_x509.NameOID.COMMON_NAME, name),
]))
certificate = builder.sign(
private_key=pkey, algorithm=self.signing_digest(),
)
return certificate
[docs]
def signUserCsr(self, xcsr: c_x509.CertificateSigningRequest,
signas: str,
outp: OutPutOrNone = None,
save: bool = True) -> PkeyOrNoneAndCert:
'''
Signs a user CSR with a CA keypair.
Args:
xcsr: The certificate signing request.
signas: The CA keypair name to sign the CSR with.
outp: The output buffer.
Examples:
Sign a user CSR with "myca"::
cdir.signUserCsr(mycsr, 'myca')
Returns:
Tuple containing the public key and certificate objects.
'''
pkey = xcsr.public_key()
name = xcsr.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0]
name = name.value
return self.genUserCert(name, csr=pkey, signas=signas, outp=outp, save=save)
def _loadCasIntoSSLContext(self, ctx):
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'cas')
if not os.path.isdir(path):
continue
for name in os.listdir(path):
if name.endswith('.crt'):
ctx.load_verify_locations(os.path.join(path, name))
[docs]
def getClientSSLContext(self, certname: StrOrNone = None) -> ssl.SSLContext:
'''
Returns an ssl.SSLContext appropriate for initiating a TLS session
Args:
certname: If specified, use the user certificate with the matching
name to authenticate to the remote service.
Returns:
A SSLContext object.
'''
sslctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
sslctx.verify_flags &= ~ssl.VERIFY_X509_STRICT
sslctx.minimum_version = ssl.TLSVersion.TLSv1_2
self._loadCasIntoSSLContext(sslctx)
if certname is not None:
username = certname
if username.find('@') != -1:
user, host = username.split('@', 1)
username = self.getUserForHost(user, host)
if username is None:
mesg = f'User certificate not found: {certname}'
raise s_exc.NoSuchCert(mesg=mesg)
certpath = self.getUserCertPath(username)
if certpath is None:
mesg = f'User certificate not found: {certname}'
raise s_exc.NoSuchCert(mesg=mesg)
keypath = self.getUserKeyPath(username)
if keypath is None:
mesg = f'User private key not found: {certname}'
raise s_exc.NoCertKey(mesg=mesg)
sslctx.load_cert_chain(certpath, keypath)
return sslctx
[docs]
def getCrlPath(self, name: str) -> StrOrNone:
for cdir in self.certdirs:
path = s_common.genpath(cdir, 'crls', '%s.crl' % name)
if os.path.isfile(path):
return path
[docs]
def genCrlPath(self, name: str) -> str:
path = self.getCrlPath(name)
if path is None:
s_common.gendir(self.certdirs[0], 'crls')
path = os.path.join(self.certdirs[0], 'crls', f'{name}.crl')
return path
[docs]
def genCaCrl(self, name: str) -> Crl:
'''
Get the CRL for a given CA.
Args:
name: The CA name.
Returns:
The CRL object.
'''
return Crl(self, name)
def _getServerSSLContext(self, hostname=None, caname=None) -> ssl.SSLContext:
sslctx = getServerSSLContext()
if hostname is None:
hostname = socket.gethostname()
certfile = self.getHostCertPath(hostname)
if certfile is None:
mesg = f'Missing TLS certificate file for host: {hostname}'
raise s_exc.NoCertKey(mesg=mesg)
keyfile = self.getHostKeyPath(hostname)
if keyfile is None:
mesg = f'Missing TLS key file for host: {hostname}'
raise s_exc.NoCertKey(mesg=mesg)
sslctx.load_cert_chain(certfile, keyfile)
if caname is not None:
cafile = self.getCaCertPath(caname)
if cafile is None:
mesg = f'Missing CA Certificate for {caname}'
raise s_exc.NoSuchCert(mesg=mesg)
sslctx.verify_mode = ssl.VerifyMode.CERT_REQUIRED
sslctx.load_verify_locations(cafile=cafile)
return sslctx
[docs]
def saveCertPem(self, cert: c_x509.Certificate, path: str) -> None:
'''
Save a certificate in PEM format to a file outside the certdir.
'''
with s_common.genfile(path) as fd:
fd.truncate(0)
fd.write(cert.public_bytes(c_serialization.Encoding.PEM))
[docs]
def savePkeyPem(self, pkey: c_types.PrivateKeyTypes, path: str) -> None:
'''
Save a private key in PEM format to a file outside the certdir.
'''
byts = pkey.private_bytes(encoding=c_serialization.Encoding.PEM,
format=c_serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=c_serialization.NoEncryption(),
)
with s_common.genfile(path) as fd:
fd.truncate(0)
fd.write(byts)
def _getCertName(self, cert):
attrs = cert.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)
if not attrs:
raise s_exc.BadCertBytes(mesg='Certificate is missing a Common Name (CN) subject field.')
return attrs[0].value
[docs]
def saveCaCertByts(self, byts: bytes) -> str:
cert = self._loadCertByts(byts)
name = self._getCertName(cert)
return self._saveCertTo(cert, 'cas', f'{name}.crt')
[docs]
def saveHostCertByts(self, byts: bytes) -> str:
cert = self._loadCertByts(byts)
name = self._getCertName(cert)
return self._saveCertTo(cert, 'hosts', f'{name}.crt')
[docs]
def saveUserCertByts(self, byts: bytes) -> str:
cert = self._loadCertByts(byts)
name = self._getCertName(cert)
return self._saveCertTo(cert, 'users', f'{name}.crt')
[docs]
def saveCodeCertBytes(self, byts: bytes) -> str:
cert = self._loadCertByts(byts)
name = self._getCertName(cert)
return self._saveCertTo(cert, 'code', f'{name}.crt')
def _checkDupFile(self, path) -> None:
if os.path.isfile(path):
raise s_exc.DupFileName(mesg=f'Duplicate file {path}', path=path)
def _genPrivKey(self) -> c_rsa.RSAPrivateKey:
return c_rsa.generate_private_key(65537, self.crypto_numbits)
def _genCertBuilder(self, name: str, pubkey: c_types.PublicKeyTypes) -> c_x509.CertificateBuilder:
if not 1 <= len(name.encode('utf-8')) <= 64:
mesg = f'Certificate name values must be between 1-64 bytes when utf8-encoded. got name={name}, len={len(name.encode("utf-8"))}'
raise s_exc.CryptoErr(mesg=mesg)
builder = c_x509.CertificateBuilder()
builder = builder.subject_name(c_x509.Name([
c_x509.NameAttribute(c_x509.NameOID.COMMON_NAME, name),
]))
now = datetime.datetime.now(datetime.UTC)
builder = builder.not_valid_before(now)
builder = builder.not_valid_after(now + TEN_YEARS_TD) # certificates are good for 10 years
builder = builder.serial_number(int(s_common.guid(), 16))
builder = builder.public_key(pubkey)
return builder
def _genPkeyCsr(self, name: str, mode: str, outp: OutPutOrNone = None) -> bytes:
if not 1 <= len(name.encode('utf-8')) <= 64:
mesg = f'CSR name values must be between 1-64 bytes when utf8-encoded. got name={name}, len={len(name.encode("utf-8"))}'
raise s_exc.CryptoErr(mesg=mesg)
pkey = self._genPrivKey()
builder = c_x509.CertificateSigningRequestBuilder()
builder = builder.subject_name(c_x509.Name([c_x509.NameAttribute(c_x509.NameOID.COMMON_NAME, name), ]))
builder = builder.add_extension(c_x509.BasicConstraints(ca=False, path_length=None), critical=True)
request = builder.sign(pkey, c_hashes.SHA256())
keypath = self._savePkeyTo(pkey, mode, '%s.key' % name)
if outp is not None:
outp.printf('key saved: %s' % (keypath,))
csrpath = self._getPathJoin(mode, '%s.csr' % name)
self._checkDupFile(csrpath)
byts = request.public_bytes(c_serialization.Encoding.PEM)
with s_common.genfile(csrpath) as fd:
fd.truncate(0)
fd.write(byts)
if outp is not None:
outp.printf('csr saved: %s' % (csrpath,))
return byts
def _getCaPath(self, cert: c_x509.Certificate) -> StrOrNone:
issuer = cert.issuer.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0]
return self.getCaCertPath(issuer.value)
def _getPathBytes(self, path: str) -> BytesOrNone:
if path is None:
return None
return s_common.getbytes(path)
#
def _getPathJoin(self, *paths: str) -> str:
'''Get the base certdir path + paths'''
return s_common.genpath(self.certdirs[0], *paths)
def _loadCertPath(self, path: str) -> CertOrNone:
byts = self._getPathBytes(path)
if byts:
return self._loadCertByts(byts)
[docs]
def loadCertByts(self, byts: bytes) -> c_x509.Certificate:
'''
Load a X509 certificate from its PEM encoded bytes.
Args:
byts: The PEM encoded bytes of the certificate.
Returns:
The X509 certificate.
Raises:
BadCertBytes: If the certificate bytes are invalid.
'''
return self._loadCertByts(byts)
def _loadCertByts(self, byts: bytes) -> c_x509.Certificate:
try:
return c_x509.load_pem_x509_certificate(byts)
except Exception as e:
raise s_exc.BadCertBytes(mesg=f'Failed to load bytes: {e}') from None
def _loadCsrPath(self, path: str) -> Union[c_x509.CertificateSigningRequest | None]:
byts = self._getPathBytes(path)
if byts:
return self._loadCsrByts(byts)
def _loadCsrByts(self, byts: bytes) -> c_x509.CertificateSigningRequest:
return c_x509.load_pem_x509_csr(byts)
def _loadKeyPath(self, path: str) -> PkeyOrNone:
byts = self._getPathBytes(path)
if byts:
pkey = c_serialization.load_pem_private_key(byts, password=None)
if isinstance(pkey, (c_rsa.RSAPrivateKey, c_dsa.DSAPrivateKey)):
return pkey
raise s_exc.BadCertBytes(mesg=f'Key is {pkey.__class__.__name__}, expected a DSA or RSA key, {path=}',
path=path)
def _loadP12Path(self, path: str) -> Pkcs12OrNone:
byts = self._getPathBytes(path)
if byts:
p12 = c_pkcs12.load_pkcs12(byts, password=None)
return p12
def _saveCertTo(self, cert: c_x509.Certificate, *paths: str) -> str:
path = self._getPathJoin(*paths)
self._checkDupFile(path)
with s_common.genfile(path) as fd:
fd.truncate(0)
fd.write(self._certToByts(cert))
return path
def _certToByts(self, cert: c_x509.Certificate):
return cert.public_bytes(encoding=c_serialization.Encoding.PEM)
def _pkeyToByts(self, pkey: Pkey) -> bytes:
return pkey.private_bytes(encoding=c_serialization.Encoding.PEM,
format=c_serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=c_serialization.NoEncryption(),
)
def _savePkeyTo(self, pkey: Pkey, *paths: str):
path = self._getPathJoin(*paths)
self._checkDupFile(path)
with s_common.genfile(path) as fd:
fd.truncate(0)
fd.write(self._pkeyToByts(pkey))
return path
def _saveP12To(self, byts: bytes, *paths: str):
path = self._getPathJoin(*paths)
self._checkDupFile(path)
with s_common.genfile(path) as fd:
fd.truncate(0)
fd.write(byts)
return path
certdir = CertDir()
[docs]
def getCertDir() -> CertDir:
'''
Get the singleton CertDir instance.
Returns:
CertDir: A certdir object.
'''
return certdir
[docs]
def addCertPath(path):
return certdir.addCertPath(path)
[docs]
def delCertPath(path):
return certdir.delCertPath(path)
[docs]
def getCertDirn() -> str:
'''
Get the expanded default path used by the singleton CertDir instance.
Returns:
str: The path string.
'''
return s_common.genpath(defdir)