Source code for synapse.lib.crypto.passwd

import os
import hmac
import base64
import hashlib
import logging
import binascii

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.coro as s_coro

from typing import AnyStr, Dict

logger = logging.getLogger(__name__)

PBKDF2 = 'pbkdf2'
PBKDF2_HASH = 'sha256'
PBKDF2_ITERATIONS = 310_000  # OWASP recommended value 20221004

DEFAULT_PTYP = PBKDF2

def _getPbkdf2(passwd):
    salt = os.urandom(32)
    func_params = {'salt': salt,
                   'iterations': PBKDF2_ITERATIONS,
                   'hash_name': PBKDF2_HASH,
                   }
    hashed = hashlib.pbkdf2_hmac(password=passwd.encode(), **func_params)
    shadow = {'type': PBKDF2,
              'hashed': hashed,
              'func_params': func_params
              }
    return shadow

def _verifyPbkdf2(passwd: AnyStr, shadow: Dict) -> bool:
    hashed = shadow.get('hashed')
    if hashed is None:
        raise s_exc.CryptoErr(mesg='No hashed in pbdkf2 shadow')

    func_params = shadow.get('func_params')
    if func_params is None:
        raise s_exc.CryptoErr(mesg='No func_params in pbdkf2 shadow')

    hash_name = func_params.get('hash_name')
    if hash_name is None:
        raise s_exc.CryptoErr(mesg='Missing hash_name in pbdkf2 shadow')

    salt = func_params.get('salt')
    if salt is None:
        raise s_exc.CryptoErr(mesg='Missing salt in pbdkf2 shadow')

    iterations = func_params.get('iterations')
    if iterations is None:
        raise s_exc.CryptoErr(mesg='Missing iterations in pbdkf2 shadow')

    check = hashlib.pbkdf2_hmac(hash_name=hash_name, password=passwd.encode(),
                                salt=salt, iterations=iterations)
    return hmac.compare_digest(hashed, check)

[docs]async def getPbkdf2(passwd: AnyStr) -> Dict: return await s_coro.executor(_getPbkdf2, passwd=passwd)
[docs]async def verifyPbkdf2(passwd: AnyStr, shadow: Dict) -> bool: return await s_coro.executor(_verifyPbkdf2, passwd=passwd, shadow=shadow)
_efuncs = { PBKDF2: getPbkdf2, } _vfuncs = { PBKDF2: verifyPbkdf2, } assert set(_efuncs.keys()) == set(_vfuncs.keys())
[docs]async def getShadowV2(passwd: AnyStr) -> Dict: ''' Get the shadow dictionary for a given password. Args: passwd (str): Password to hash. ptyp (str): The password hash type. Returns: dict: A dictionary containing shadowed password information. ''' func = _efuncs.get(DEFAULT_PTYP) if func is None: raise s_exc.CryptoErr(mesg=f'type [{DEFAULT_PTYP}] does not map to a known function', valu=DEFAULT_PTYP) return await func(passwd=passwd)
[docs]async def checkShadowV2(passwd: AnyStr, shadow: Dict) -> bool: ''' Check a password against a shadow dictionary. Args: passwd (str): Password to check. shadow (dict): Data to check the password against. Returns: bool: True if the password is valid, false otherwise. ''' ptyp = shadow.get('type') func = _vfuncs.get(ptyp) if func is None: raise s_exc.CryptoErr(mesg=f'type [{ptyp}] does not map to a known function', valu=ptyp) return await func(passwd=passwd, shadow=shadow)
[docs]async def generateApiKey(iden=None): if iden is None: iden = s_common.guid() else: if not s_common.isguid(iden): raise s_exc.CryptoErr(mesg=f'Invalid iden provided: {iden}, must be a guid.') secv = s_common.guid() key = base64.b64encode(s_common.uhex(iden) + s_common.uhex(secv), altchars=b'-_').decode('utf-8') shadow = await getShadowV2(secv) return iden, key, shadow
[docs]def parseApiKey(valu): try: buf = base64.b64decode(valu.encode('utf-8'), altchars=b'-_', validate=True) except binascii.Error as e: return False, f'{e}' if len(buf) != 32: return False, f'Incorrect length, got {len(valu)}' return True, (s_common.ehex(buf[:16]), s_common.ehex(buf[16:]))