Source code for synapse.lib.aha

import os
import copy
import random
import asyncio
import logging
import collections

import cryptography.x509 as c_x509

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.daemon as s_daemon
import synapse.telepath as s_telepath

import synapse.lib.base as s_base
import synapse.lib.cell as s_cell
import synapse.lib.coro as s_coro
import synapse.lib.nexus as s_nexus
import synapse.lib.queue as s_queue
import synapse.lib.config as s_config
import synapse.lib.httpapi as s_httpapi
import synapse.lib.msgpack as s_msgpack
import synapse.lib.schemas as s_schemas
import synapse.lib.jsonstor as s_jsonstor
import synapse.lib.lmdbslab as s_lmdbslab

logger = logging.getLogger(__name__)

_provSvcSchema = {
    'type': 'object',
    'properties': {
        'name': {
            'type': 'string',
            'minLength': 1,
        },
        'provinfo': {
            'type': 'object',
            'properties': {
                'conf': {
                    'type': 'object',
                },
                'dmon:port': {
                    'type': 'integer',
                    'minimum': 0,
                    'maximum': 65535,
                },
                'https:port': {
                    'type': 'integer',
                    'minimum': 0,
                    'maximum': 65535,
                },
                'mirror': {
                    'type': 'string',
                    'minLength': 1,
                },
            }
        }
    },
    'additionalProperties': False,
    'required': ['name'],
}
provSvcSchema = s_config.getJsValidator(_provSvcSchema)


[docs] class AhaProvisionServiceV1(s_httpapi.Handler):
[docs] async def post(self): if not await self.reqAuthAdmin(): return body = self.getJsonBody(validator=provSvcSchema) if body is None: return name = body.get('name') provinfo = body.get('provinfo') try: url = await self.cell.addAhaSvcProv(name, provinfo=provinfo) except asyncio.CancelledError: # pragma: no cover raise except s_exc.SynErr as e: logger.exception(f'Error provisioning {name}') return self.sendRestErr(e.__class__.__name__, e.get('mesg', str(e))) except Exception as e: # pragma: no cover logger.exception(f'Error provisioning {name}') return self.sendRestErr(e.__class__.__name__, str(e)) return self.sendRestRetn({'url': url})
_getAhaSvcSchema = { 'type': 'object', 'properties': { 'network': { 'type': 'string', 'minLength': 1, 'default': None, }, }, 'additionalProperties': False, } getAhaScvSchema = s_config.getJsValidator(_getAhaSvcSchema)
[docs] class AhaServicesV1(s_httpapi.Handler):
[docs] async def get(self): if not await self.reqAuthAdmin(): return network = None if self.request.body: body = self.getJsonBody(validator=getAhaScvSchema) if body is None: return network = body.get('network') ret = [] try: async for info in self.cell.getAhaSvcs(network=network): ret.append(info) except asyncio.CancelledError: # pragma: no cover raise except Exception as e: # pragma: no cover logger.exception(f'Error getting Aha services.') return self.sendRestErr(e.__class__.__name__, str(e)) return self.sendRestRetn(ret)
[docs] class AhaApi(s_cell.CellApi):
[docs] @s_cell.adminapi() async def addAhaClone(self, host, port=27492, conf=None): return await self.cell.addAhaClone(host, port=port, conf=conf)
[docs] async def getAhaUrls(self, user='root'): ahaurls = await self.cell.getAhaUrls(user=user) if ahaurls is None: return () return ahaurls
[docs] async def getAhaSvc(self, name, filters=None): ''' Return an AHA service description dictionary for a service name. ''' svcinfo = await self.cell.getAhaSvc(name, filters=filters) if svcinfo is None: return None svcnetw = svcinfo.get('svcnetw') await self._reqUserAllowed(('aha', 'service', 'get', svcnetw)) svcinfo = s_msgpack.deepcopy(svcinfo) # suggest that the user of the remote service is the same username = self.user.name.split('@')[0] if svcinfo.get('svcinfo'): svcinfo['svcinfo']['urlinfo']['user'] = username return svcinfo
[docs] async def getAhaSvcs(self, network=None): ''' Yield AHA svcinfo dictionaries. Args: network (str): Optionally specify a network to filter on. ''' if network is None: await self._reqUserAllowed(('aha', 'service', 'get')) else: await self._reqUserAllowed(('aha', 'service', 'get', network)) async for info in self.cell.getAhaSvcs(network=network): yield info
[docs] async def addAhaSvc(self, name, info, network=None): ''' Register a service with the AHA discovery server. NOTE: In order for the service to remain marked "up" a caller must maintain the telepath link. ''' svcname, svcnetw, svcfull = self.cell._nameAndNetwork(name, network) await self._reqUserAllowed(('aha', 'service', 'add', svcnetw, svcname)) # dont disclose the real session... sess = s_common.guid(self.sess.iden) info['online'] = sess info.setdefault('ready', True) if self.link.sock is not None: host, port = self.link.sock.getpeername() urlinfo = info.get('urlinfo', {}) urlinfo.setdefault('host', host) async def fini(): if self.cell.isfini: # pragma: no cover mesg = f'{self.cell.__class__.__name__} is fini. Unable to set {name}@{network} as down.' logger.warning(mesg, await self.cell.getLogExtra(name=svcname, netw=svcnetw)) return logger.info(f'AhaCellApi fini, setting service offline [{name}]', extra=await self.cell.getLogExtra(name=svcname, netw=svcnetw)) coro = self.cell.setAhaSvcDown(name, sess, network=network) self.cell.schedCoro(coro) # this will eventually execute or get cancelled. self.onfini(fini) return await self.cell.addAhaSvc(name, info, network=network)
[docs] async def modAhaSvcInfo(self, name, svcinfo): for key in svcinfo.keys(): if key not in ('ready',): mesg = f'Editing AHA service info property ({key}) is not supported!' raise s_exc.BadArg(mesg=mesg) svcentry = await self.cell.getAhaSvc(name) if svcentry is None: return False svcnetw = svcentry.get('svcnetw') svcname = svcentry.get('svcname') await self._reqUserAllowed(('aha', 'service', 'add', svcnetw, svcname)) return await self.cell.modAhaSvcInfo(name, svcinfo)
[docs] async def getAhaSvcMirrors(self, name): ''' Return list of AHA svcinfo dictionaries for mirrors of a service. ''' svcinfo = await self.cell.getAhaSvc(name) if svcinfo is None: return None svcnetw = svcinfo.get('svcnetw') await self._reqUserAllowed(('aha', 'service', 'get', svcnetw)) svciden = svcinfo['svcinfo']['iden'] return await self.cell.getAhaSvcMirrors(svciden)
[docs] async def delAhaSvc(self, name, network=None): ''' Remove an AHA service entry. ''' svcname, svcnetw, svcfull = self.cell._nameAndNetwork(name, network) await self._reqUserAllowed(('aha', 'service', 'del', svcnetw, svcname)) return await self.cell.delAhaSvc(name, network=network)
[docs] async def getCaCert(self, network): await self._reqUserAllowed(('aha', 'ca', 'get')) return await self.cell.getCaCert(network)
[docs] async def genCaCert(self, network): await self._reqUserAllowed(('aha', 'ca', 'gen')) return await self.cell.genCaCert(network)
[docs] async def signHostCsr(self, csrtext, signas=None, sans=None): await self._reqUserAllowed(('aha', 'csr', 'host')) return await self.cell.signHostCsr(csrtext, signas=signas, sans=sans)
[docs] async def signUserCsr(self, csrtext, signas=None): await self._reqUserAllowed(('aha', 'csr', 'user')) return await self.cell.signUserCsr(csrtext, signas=signas)
[docs] @s_cell.adminapi() async def addAhaPool(self, name, info): return await self.cell.addAhaPool(name, info)
[docs] @s_cell.adminapi() async def delAhaPool(self, name): return await self.cell.delAhaPool(name)
[docs] @s_cell.adminapi() async def addAhaPoolSvc(self, poolname, svcname, info): return await self.cell.addAhaPoolSvc(poolname, svcname, info)
[docs] @s_cell.adminapi() async def delAhaPoolSvc(self, poolname, svcname): return await self.cell.delAhaPoolSvc(poolname, svcname)
[docs] async def iterPoolTopo(self, name): username = self.user.name.split('@')[0] async for item in self.cell.iterPoolTopo(name): # default to using the same username as we do for aha if item[0] == 'svc:add': item[1]['svcinfo']['urlinfo']['user'] = username yield item
[docs] async def getAhaPool(self, name): return await self.cell.getAhaPool(name)
[docs] async def getAhaPools(self): async for item in self.cell.getAhaPools(): yield item
[docs] async def getAhaServers(self): return await self.cell.getAhaServers()
[docs] async def getAhaServer(self, host, port): return await self.cell.getAhaServer(host, port)
[docs] @s_cell.adminapi() async def addAhaServer(self, server): return await self.cell.addAhaServer(server)
[docs] @s_cell.adminapi() async def delAhaServer(self, host, port): return await self.cell.delAhaServer(host, port)
[docs] @s_cell.adminapi() async def addAhaSvcProv(self, name, provinfo=None): ''' Provision the given relative service name within the configured network name. ''' return await self.cell.addAhaSvcProv(name, provinfo=provinfo)
[docs] @s_cell.adminapi() async def delAhaSvcProv(self, iden): ''' Remove a previously added provisioning entry by iden. ''' return await self.cell.delAhaSvcProv(iden)
[docs] @s_cell.adminapi() async def addAhaUserEnroll(self, name, userinfo=None, again=False): ''' Create and return a one-time user enroll key. ''' return await self.cell.addAhaUserEnroll(name, userinfo=userinfo, again=again)
[docs] @s_cell.adminapi() async def delAhaUserEnroll(self, iden): ''' Remove a previously added enrollment entry by iden. ''' return await self.cell.delAhaUserEnroll(iden)
[docs] @s_cell.adminapi() async def clearAhaSvcProvs(self): ''' Remove all unused service provisioning values. ''' return await self.cell.clearAhaSvcProvs()
[docs] @s_cell.adminapi() async def clearAhaUserEnrolls(self): ''' Remove all unused user enrollment provisioning values. ''' return await self.cell.clearAhaUserEnrolls()
[docs] @s_cell.adminapi() async def clearAhaClones(self): ''' Remove all unused AHA clone provisioning values. ''' return await self.cell.clearAhaClones()
[docs] class ProvDmon(s_daemon.Daemon): async def __anit__(self, aha): self.aha = aha await s_daemon.Daemon.__anit__(self) async def _getSharedItem(self, name): provinfo = await self.aha.getAhaSvcProv(name) if provinfo is not None: await self.aha.delAhaSvcProv(name) conf = provinfo.get('conf', {}) anam = conf.get('aha:name') anet = conf.get('aha:network') mesg = f'Retrieved service provisioning info for {anam}.{anet} iden {name}' logger.info(mesg, extra=await self.aha.getLogExtra(iden=name, name=anam, netw=anet)) return ProvApi(self.aha, provinfo) userinfo = await self.aha.getAhaUserEnroll(name) if userinfo is not None: unam = userinfo.get('name') mesg = f'Retrieved user provisioning info for {unam} iden {name}' logger.info(mesg, extra=await self.aha.getLogExtra(iden=name, name=unam)) await self.aha.delAhaUserEnroll(name) return EnrollApi(self.aha, userinfo) clone = await self.aha.getAhaClone(name) if clone is not None: host = clone.get('host') mesg = f'Retrieved AHA clone info for {host} iden {name}' logger.info(mesg, extra=await self.aha.getLogExtra(iden=name, host=host)) return CloneApi(self.aha, clone) mesg = f'Invalid provisioning identifier name={name}. This could be' \ f' caused by the re-use of a provisioning URL.' raise s_exc.NoSuchName(mesg=mesg, name=name)
[docs] class CloneApi: def __init__(self, aha, clone): self.aha = aha self.clone = clone
[docs] async def getCloneDef(self): return self.clone
[docs] async def readyToMirror(self): return await self.aha.readyToMirror()
[docs] async def iterNewBackupArchive(self, name=None, remove=False): async with self.aha.getLocalProxy() as proxy: async for byts in proxy.iterNewBackupArchive(name=name, remove=remove): yield byts
[docs] class EnrollApi: def __init__(self, aha, userinfo): self.aha = aha self.userinfo = userinfo
[docs] async def getUserInfo(self): user = self.userinfo.get('name') return { 'aha:urls': await self.aha.getAhaUrls(user=user), 'aha:user': user, 'aha:network': self.aha.conf.req('aha:network'), }
[docs] async def getCaCert(self): ahanetw = self.aha.conf.req('aha:network') return self.aha.certdir.getCaCertBytes(ahanetw)
[docs] async def signUserCsr(self, byts): ahauser = self.userinfo.get('name') ahanetw = self.aha.conf.req('aha:network') username = f'{ahauser}@{ahanetw}' xcsr = self.aha.certdir._loadCsrByts(byts) name = xcsr.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0].value if name != username: mesg = f'Invalid user CSR CN={name}.' raise s_exc.BadArg(mesg=mesg) logger.info(f'Signing user CSR for [{username}], signas={ahanetw}', extra=await self.aha.getLogExtra(name=username, signas=ahanetw)) pkey, cert = self.aha.certdir.signUserCsr(xcsr, ahanetw, save=False) return self.aha.certdir._certToByts(cert)
[docs] class ProvApi: def __init__(self, aha, provinfo): self.aha = aha self.provinfo = provinfo
[docs] async def getProvInfo(self): return self.provinfo
[docs] async def getCaCert(self): ahanetw = self.aha.conf.req('aha:network') return self.aha.certdir.getCaCertBytes(ahanetw)
[docs] async def signHostCsr(self, byts): ahaname = self.provinfo['conf'].get('aha:name') ahanetw = self.provinfo['conf'].get('aha:network') hostname = f'{ahaname}.{ahanetw}' xcsr = self.aha.certdir._loadCsrByts(byts) name = xcsr.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0].value if name != hostname: mesg = f'Invalid host CSR CN={name}.' raise s_exc.BadArg(mesg=mesg) logger.info(f'Signing host CSR for [{hostname}], signas={ahanetw}', extra=await self.aha.getLogExtra(name=hostname, signas=ahanetw)) pkey, cert = self.aha.certdir.signHostCsr(xcsr, ahanetw, save=False) return self.aha.certdir._certToByts(cert)
[docs] async def signUserCsr(self, byts): ahauser = self.provinfo['conf'].get('aha:user') ahanetw = self.provinfo['conf'].get('aha:network') username = f'{ahauser}@{ahanetw}' xcsr = self.aha.certdir._loadCsrByts(byts) name = xcsr.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0].value if name != username: mesg = f'Invalid user CSR CN={name}.' raise s_exc.BadArg(mesg=mesg) logger.info(f'Signing user CSR for [{username}], signas={ahanetw}', extra=await self.aha.getLogExtra(name=username, signas=ahanetw)) pkey, cert = self.aha.certdir.signUserCsr(xcsr, ahanetw, save=False) return self.aha.certdir._certToByts(cert)
[docs] class AhaCell(s_cell.Cell): cellapi = AhaApi confbase = copy.deepcopy(s_cell.Cell.confbase) confbase['mirror']['hidedocs'] = False # type: ignore confbase['mirror']['hidecmdl'] = False # type: ignore confdefs = { 'clone': { 'hidecmdl': True, 'description': 'Bootstrap a clone from the AHA clone URL.', 'type': ['string', 'null'], }, 'dns:name': { 'description': 'The registered DNS name used to reach the AHA service.', 'type': ['string', 'null'], }, 'aha:urls': { 'description': 'Deprecated. AHA servers can now manage this automatically.', 'type': ['string', 'array'], 'items': {'type': 'string'}, }, 'provision:listen': { 'description': 'A telepath URL for the AHA provisioning listener.', 'type': ['string', 'null'], }, } # Rename the class and remove these two overrides in 3.0.0
[docs] @classmethod def getEnvPrefix(cls): return (f'SYN_AHA', f'SYN_{cls.__name__.upper()}', )
async def _initCellBoot(self): curl = self.conf.get('clone') if curl is None: return path = s_common.genpath(self.dirn, 'cell.guid') if os.path.isfile(path): logger.info('Cloning AHA: cell.guid detected. Skipping.') return logger.warning(f'Cloning AHA: {curl}') async with await s_telepath.openurl(curl) as proxy: clone = await proxy.getCloneDef() await self._initCloneCell(proxy) logger.warning('Cloning AHA: done!') conf = s_common.yamlload(self.dirn, 'cell.yaml') if conf is None: conf = {} conf.update(clone.get('conf', {})) s_common.yamlsave(conf, self.dirn, 'cell.yaml') self.conf.update(conf)
[docs] async def initServiceStorage(self): # TODO plumb using a remote jsonstor? dirn = s_common.gendir(self.dirn, 'slabs', 'jsonstor') slab = await s_lmdbslab.Slab.anit(dirn) slab.addResizeCallback(self.checkFreeSpace) self.jsonstor = await s_jsonstor.JsonStor.anit(slab, 'aha') # type: s_jsonstor.JsonStor async def fini(): await self.jsonstor.fini() await slab.fini() self.onfini(fini) self.slab.initdb('aha:provs') self.slab.initdb('aha:enrolls') self.slab.initdb('aha:clones') self.slab.initdb('aha:servers') self.slab.initdb('aha:pools') self.poolwindows = collections.defaultdict(list)
[docs] async def getAhaServer(self, host, port): lkey = s_msgpack.en((host, port)) byts = self.slab.get(lkey, db='aha:servers') if byts is not None: return s_msgpack.un(byts)
[docs] async def addAhaServer(self, server): host = server.get('host') port = server.setdefault('port', 27492) # avoid a noop nexus change... oldv = await self.getAhaServer(host, port) if s_common.flatten(server) == s_common.flatten(oldv): return False return await self._push('aha:server:add', server)
@s_nexus.Pusher.onPush('aha:server:add') async def _addAhaServer(self, server): # TODO schema host = server.get('host') port = server.get('port') lkey = s_msgpack.en((host, port)) byts = self.slab.get(lkey, db='aha:servers') if byts is not None: oldv = s_msgpack.un(byts) if s_common.flatten(server) == s_common.flatten(oldv): return False self.slab.put(lkey, s_msgpack.en(server), db='aha:servers') return True
[docs] @s_nexus.Pusher.onPushAuto('aha:server:del') async def delAhaServer(self, host, port): lkey = s_msgpack.en((host, port)) byts = self.slab.pop(lkey, db='aha:servers') if byts is None: return None return s_msgpack.un(byts)
[docs] async def getAhaServers(self): servers = [] for _, byts in self.slab.scanByFull(db='aha:servers'): servers.append(s_msgpack.un(byts)) return servers
[docs] async def iterPoolTopo(self, name): name = self._getAhaName(name) async with await s_queue.Window.anit(maxsize=1000) as wind: poolinfo = self._reqPoolInfo(name) # pre-load the current state for svcname in poolinfo.get('services'): svcitem = await self.jsonstor.getPathObj(('aha', 'svcfull', svcname)) if not svcitem: logger.warning(f'Pool ({name}) includes service ({svcname}) which does not exist.') continue await wind.put(('svc:add', svcitem)) # subscribe to changes self.poolwindows[name].append(wind) async def onfini(): self.poolwindows[name].remove(wind) wind.onfini(onfini) # iterate events... async for mesg in wind: yield mesg
def _initCellHttpApis(self): s_cell.Cell._initCellHttpApis(self) self.addHttpApi('/api/v1/aha/services', AhaServicesV1, {'cell': self}) self.addHttpApi('/api/v1/aha/provision/service', AhaProvisionServiceV1, {'cell': self})
[docs] async def initServiceRuntime(self): self.addActiveCoro(self._clearInactiveSessions) if self.isactive: # bootstrap a CA for our aha:network netw = self.conf.req('aha:network') if self.certdir.getCaCertPath(netw) is None: logger.info(f'Adding CA certificate for {netw}') await self.genCaCert(netw) name = self.conf.get('aha:name') if name is not None: host = f'{name}.{netw}' if self.certdir.getHostCertPath(host) is None: logger.info(f'Adding server certificate for {host}') await self._genHostCert(host, signas=netw) root = f'root@{netw}' await self._genUserCert(root, signas=netw) user = self.conf.get('aha:admin') if user is not None: await self._genUserCert(user, signas=netw)
def _getDnsName(self): # emulate the old aha name.network behavior if the # explicit option is not set. hostname = self.conf.get('dns:name') if hostname is not None: return hostname ahaname = self.conf.get('aha:name') ahanetw = self.conf.get('aha:network') if ahaname is not None and ahanetw is not None: return f'{ahaname}.{ahanetw}' def _getProvListen(self): lisn = self.conf.get('provision:listen') if lisn is not None: return lisn # this may not use _getDnsName() in order to maintain # backward compatibilty with aha name.network configs # that do not intend to listen for provisioning. hostname = self.conf.get('dns:name') if hostname is not None: return f'ssl://0.0.0.0:27272?hostname={hostname}' def _getDmonListen(self): lisn = self.conf.get('dmon:listen', s_common.novalu) if lisn is not s_common.novalu: return lisn network = self.conf.req('aha:network') dnsname = self._getDnsName() if dnsname is not None: return f'ssl://0.0.0.0?hostname={dnsname}&ca={network}' def _reqProvListen(self): lisn = self._getProvListen() if lisn is not None: return lisn mesg = 'The AHA server is not configured for provisioning.' raise s_exc.NeedConfValu(mesg=mesg)
[docs] async def initServiceNetwork(self): # bootstrap CA/host certs first network = self.conf.req('aha:network') hostname = self._getDnsName() if hostname is not None and network is not None: await self._genHostCert(hostname, signas=network) await s_cell.Cell.initServiceNetwork(self) # all AHA mirrors are registered if hostname is not None and self.sockaddr is not None: server = {'host': hostname, 'port': self.sockaddr[1]} await self.addAhaServer(server) self.provdmon = None provurl = self._getProvListen() if provurl is not None: self.provdmon = await ProvDmon.anit(self) self.onfini(self.provdmon) logger.info(f'provision listening: {provurl}') self.provaddr = await self.provdmon.listen(provurl)
async def _clearInactiveSessions(self): async for svc in self.getAhaSvcs(): if svc.get('svcinfo', {}).get('online') is None: continue current_sessions = {s_common.guid(iden) for iden in self.dmon.sessions.keys()} svcname = svc.get('svcname') network = svc.get('svcnetw') linkiden = svc.get('svcinfo').get('online') if linkiden not in current_sessions: logger.info(f'AhaCell activecoro setting service offline [{svcname}.{network}]', extra=await self.getLogExtra(name=svcname, netw=network)) await self.setAhaSvcDown(svcname, linkiden, network=network) # Wait until we are cancelled or the cell is fini. await self.waitfini() async def _waitAhaSvcOnline(self, name, timeout=None): name = self._getAhaName(name) while True: async with self.nexslock: retn = await self.getAhaSvc(name) if retn['svcinfo'].get('online') is not None: return retn waiter = self.waiter(1, f'aha:svcadd:{name}') if await waiter.wait(timeout=timeout) is None: raise s_exc.TimeOut(mesg=f'Timeout waiting for aha:svcadd:{name}') async def _waitAhaSvcDown(self, name, timeout=None): name = self._getAhaName(name) while True: async with self.nexslock: retn = await self.getAhaSvc(name) online = retn['svcinfo'].get('online') if online is None: return retn waiter = self.waiter(1, f'aha:svcdown:{name}') if await waiter.wait(timeout=timeout) is None: raise s_exc.TimeOut(mesg=f'Timeout waiting for aha:svcdown:{name}')
[docs] async def getAhaSvcs(self, network=None): path = ('aha', 'services') if network is not None: path = path + (network,) async for path, item in self.jsonstor.getPathObjs(path): yield item
def _nameAndNetwork(self, name, network): if network is None: svcfull = name try: svcname, svcnetw = name.split('.', 1) except ValueError: raise s_exc.BadArg(name=name, arg='name', mesg='Name must contain at least one "."') from None else: svcname = name svcnetw = network svcfull = f'{name}.{network}' return svcname, svcnetw, svcfull
[docs] @s_nexus.Pusher.onPushAuto('aha:svc:mod') async def modAhaSvcInfo(self, name, svcinfo): svcentry = await self.getAhaSvc(name) if svcentry is None: return False svcnetw = svcentry.get('svcnetw') svcname = svcentry.get('svcname') path = ('aha', 'services', svcnetw, svcname) for prop, valu in svcinfo.items(): await self.jsonstor.setPathObjProp(path, ('svcinfo', prop), valu) return True
[docs] @s_nexus.Pusher.onPushAuto('aha:svc:add') async def addAhaSvc(self, name, info, network=None): svcname, svcnetw, svcfull = self._nameAndNetwork(name, network) full = ('aha', 'svcfull', svcfull) path = ('aha', 'services', svcnetw, svcname) unfo = info.get('urlinfo') logger.info(f'Adding service [{svcfull}] from [{unfo.get("scheme")}://{unfo.get("host")}:{unfo.get("port")}]', extra=await self.getLogExtra(name=svcname, netw=svcnetw)) svcinfo = { 'name': svcfull, 'svcname': svcname, 'svcnetw': svcnetw, 'svcinfo': info, } await self.jsonstor.setPathObj(path, svcinfo) await self.jsonstor.setPathLink(full, path) # mostly for testing... await self.fire('aha:svcadd', svcinfo=svcinfo) await self.fire(f'aha:svcadd:{svcfull}', svcinfo=svcinfo)
def _getAhaName(self, name): # the modern version of names is absolute or ... if name.endswith('...'): return name[:-2] + self.conf.req('aha:network') return name
[docs] async def getAhaPool(self, name): name = self._getAhaName(name) byts = self.slab.get(name.encode(), db='aha:pools') if byts is not None: return s_msgpack.un(byts)
def _savePoolInfo(self, poolinfo): s_schemas.reqValidAhaPoolDef(poolinfo) name = poolinfo.get('name') self.slab.put(name.encode(), s_msgpack.en(poolinfo), db='aha:pools') def _loadPoolInfo(self, name): byts = self.slab.get(name.encode(), db='aha:pools') if byts is not None: return s_msgpack.un(byts) def _reqPoolInfo(self, name): poolinfo = self._loadPoolInfo(name) if poolinfo is not None: return poolinfo mesg = f'There is no AHA service pool named {name}.' raise s_exc.NoSuchName(mesg=mesg, name=name)
[docs] async def addAhaPool(self, name, info): name = self._getAhaName(name) if await self._getAhaSvc(name) is not None: mesg = f'An AHA service or pool is already using the name "{name}".' raise s_exc.DupName(mesg=mesg, name=name) info['name'] = name info['created'] = s_common.now() info['services'] = {} info.setdefault('creator', self.getDmonUser()) return await self._push('aha:pool:add', info)
[docs] async def getAhaPools(self): for lkey, byts in self.slab.scanByFull(db='aha:pools'): yield s_msgpack.un(byts)
@s_nexus.Pusher.onPush('aha:pool:add') async def _addAhaPool(self, info): self._savePoolInfo(info) return info
[docs] async def addAhaPoolSvc(self, poolname, svcname, info): info['created'] = s_common.now() info.setdefault('creator', self.getDmonUser()) return await self._push('aha:pool:svc:add', poolname, svcname, info)
@s_nexus.Pusher.onPush('aha:pool:svc:add') async def _addAhaPoolSvc(self, poolname, svcname, info): svcname = self._getAhaName(svcname) poolname = self._getAhaName(poolname) svcitem = await self._reqAhaSvc(svcname) poolinfo = self._loadPoolInfo(poolname) poolinfo['services'][svcname] = info self._savePoolInfo(poolinfo) for wind in self.poolwindows.get(poolname, ()): await wind.put(('svc:add', svcitem)) return poolinfo
[docs] @s_nexus.Pusher.onPushAuto('aha:pool:del') async def delAhaPool(self, name): name = self._getAhaName(name) byts = self.slab.pop(name.encode(), db='aha:pools') for wind in self.poolwindows.get(name, ()): await wind.fini() if byts is not None: return s_msgpack.un(byts)
[docs] @s_nexus.Pusher.onPushAuto('aha:pool:svc:del') async def delAhaPoolSvc(self, poolname, svcname): svcname = self._getAhaName(svcname) poolname = self._getAhaName(poolname) poolinfo = self._reqPoolInfo(poolname) poolinfo['services'].pop(svcname, None) self._savePoolInfo(poolinfo) for wind in self.poolwindows.get(poolname, ()): await wind.put(('svc:del', {'name': svcname})) return poolinfo
async def _getAhaSvc(self, svcname): # no fancy auto-resolve, just get actual service svcpath = ('aha', 'svcfull', svcname) return await self.jsonstor.getPathObj(svcpath) async def _reqAhaSvc(self, svcname): svcpath = ('aha', 'svcfull', svcname) svcitem = await self.jsonstor.getPathObj(svcpath) if svcitem is None: raise s_exc.NoSuchName(mesg=f'No AHA service is currently named "{svcname}".', name=svcname) return svcitem
[docs] @s_nexus.Pusher.onPushAuto('aha:svc:del') async def delAhaSvc(self, name, network=None): name = self._getAhaName(name) svcname, svcnetw, svcfull = self._nameAndNetwork(name, network) logger.info(f'Deleting service [{svcfull}].', extra=await self.getLogExtra(name=svcname, netw=svcnetw)) full = ('aha', 'svcfull', svcfull) path = ('aha', 'services', svcnetw, svcname) await self.jsonstor.delPathObj(path) await self.jsonstor.delPathObj(full) # mostly for testing... await self.fire('aha:svcdel', svcname=svcname, svcnetw=svcnetw)
[docs] async def setAhaSvcDown(self, name, linkiden, network=None): name = self._getAhaName(name) svcname, svcnetw, svcfull = self._nameAndNetwork(name, network) path = ('aha', 'services', svcnetw, svcname) svcinfo = await self.jsonstor.getPathObjProp(path, 'svcinfo') if svcinfo.get('online') is None: return await self._push('aha:svc:down', name, linkiden, network=network)
@s_nexus.Pusher.onPush('aha:svc:down') async def _setAhaSvcDown(self, name, linkiden, network=None): svcname, svcnetw, svcfull = self._nameAndNetwork(name, network) path = ('aha', 'services', svcnetw, svcname) if await self.jsonstor.cmpDelPathObjProp(path, 'svcinfo/online', linkiden): await self.jsonstor.setPathObjProp(path, 'svcinfo/ready', False) # Check if we have any links which may need to be removed current_sessions = {s_common.guid(iden): sess for iden, sess in self.dmon.sessions.items()} sess = current_sessions.get(linkiden) if sess is not None: for link in [lnk for lnk in self.dmon.links if lnk.get('sess') is sess]: await link.fini() await self.fire('aha:svcdown', svcname=svcname, svcnetw=svcnetw) await self.fire(f'aha:svcdown:{svcfull}', svcname=svcname, svcnetw=svcnetw) logger.info(f'Set [{svcfull}] offline.', extra=await self.getLogExtra(name=svcname, netw=svcnetw))
[docs] async def getAhaSvc(self, name, filters=None): name = self._getAhaName(name) path = ('aha', 'svcfull', name) svcentry = await self.jsonstor.getPathObj(path) if svcentry is not None: # if they requested a mirror, try to locate one if filters is not None and filters.get('mirror'): ahanetw = svcentry.get('ahanetw') svcinfo = svcentry.get('svcinfo') if svcinfo is None: # pragma: no cover return svcentry celliden = svcinfo.get('iden') mirrors = await self.getAhaSvcMirrors(celliden, network=ahanetw) if mirrors: return random.choice(mirrors) return svcentry pooldef = await self.getAhaPool(name) if pooldef is not None: # in case the caller is not pool aware, merge a service entry and the pool def svcnames = list(pooldef.get('services').keys()) # if there are not services added to the pool it does not exist yet if not svcnames: mesg = f'No services configured for pool: {name}' raise s_exc.BadArg(mesg=mesg) svcentry = await self.jsonstor.getPathObj(('aha', 'svcfull', random.choice(svcnames))) svcentry = s_msgpack.deepcopy(svcentry) svcentry.update(pooldef) return svcentry return None
[docs] async def getAhaSvcMirrors(self, iden, network=None): retn = {} skip = None async for svcentry in self.getAhaSvcs(network=network): svcinfo = svcentry.get('svcinfo') if svcinfo is None: # pragma: no cover continue if svcinfo.get('iden') != iden: # pragma: no cover continue if svcinfo.get('online') is None: # pragma: no cover continue if not svcinfo.get('ready'): continue # if we run across the leader, skip ( and mark his run ) if svcentry.get('svcname') == svcinfo.get('leader'): skip = svcinfo.get('run') continue retn[svcinfo.get('run')] = svcentry if skip is not None: retn.pop(skip, None) return list(retn.values())
[docs] async def genCaCert(self, network): path = self.certdir.getCaCertPath(network) if path is not None: with open(path, 'rb') as fd: return fd.read().decode() logger.info(f'Generating CA certificate for {network}', extra=await self.getLogExtra(netw=network)) fut = s_coro.executor(self.certdir.genCaCert, network, save=False) pkey, cert = await fut cakey = self.certdir._pkeyToByts(pkey).decode() cacert = self.certdir._certToByts(cert).decode() # nexusify storage.. await self.saveCaCert(network, cakey, cacert) return cacert
async def _genHostCert(self, hostname, signas=None): if self.certdir.getHostCertPath(hostname) is not None: return pkey, cert = await s_coro.executor(self.certdir.genHostCert, hostname, signas=signas, save=False) pkey = self.certdir._pkeyToByts(pkey).decode() cert = self.certdir._certToByts(cert).decode() await self.saveHostCert(hostname, pkey, cert) async def _genUserCert(self, username, signas=None): if self.certdir.getUserCertPath(username) is not None: return logger.info(f'Adding user certificate for {username}') pkey, cert = await s_coro.executor(self.certdir.genUserCert, username, signas=signas, save=False) pkey = self.certdir._pkeyToByts(pkey).decode() cert = self.certdir._certToByts(cert).decode() await self.saveUserCert(username, pkey, cert)
[docs] async def getCaCert(self, network): path = self.certdir.getCaCertPath(network) if path is None: return None with open(path, 'rb') as fd: return fd.read().decode()
[docs] @s_nexus.Pusher.onPushAuto('aha:ca:save') async def saveCaCert(self, name, cakey, cacert): with s_common.genfile(self.dirn, 'certs', 'cas', f'{name}.key') as fd: fd.write(cakey.encode()) with s_common.genfile(self.dirn, 'certs', 'cas', f'{name}.crt') as fd: fd.write(cacert.encode())
[docs] @s_nexus.Pusher.onPushAuto('aha:host:save') async def saveHostCert(self, name, hostkey, hostcert): with s_common.genfile(self.dirn, 'certs', 'hosts', f'{name}.key') as fd: fd.write(hostkey.encode()) with s_common.genfile(self.dirn, 'certs', 'hosts', f'{name}.crt') as fd: fd.write(hostcert.encode())
[docs] @s_nexus.Pusher.onPushAuto('aha:user:save') async def saveUserCert(self, name, userkey, usercert): with s_common.genfile(self.dirn, 'certs', 'users', f'{name}.key') as fd: fd.write(userkey.encode()) with s_common.genfile(self.dirn, 'certs', 'users', f'{name}.crt') as fd: fd.write(usercert.encode())
[docs] async def signHostCsr(self, csrtext, signas=None, sans=None): xcsr = self.certdir._loadCsrByts(csrtext.encode()) hostname = xcsr.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0].value hostpath = self.certdir.getHostCertPath(hostname) if hostpath is not None: os.unlink(hostpath) if signas is None: signas = hostname.split('.', 1)[1] logger.info(f'Signing host CSR for [{hostname}], signas={signas}, sans={sans}', extra=await self.getLogExtra(hostname=hostname, signas=signas)) pkey, cert = self.certdir.signHostCsr(xcsr, signas=signas, sans=sans) return self.certdir._certToByts(cert).decode()
[docs] async def signUserCsr(self, csrtext, signas=None): xcsr = self.certdir._loadCsrByts(csrtext.encode()) username = xcsr.subject.get_attributes_for_oid(c_x509.NameOID.COMMON_NAME)[0].value userpath = self.certdir.getUserCertPath(username) if userpath is not None: os.unlink(userpath) if signas is None: signas = username.split('@', 1)[1] logger.info(f'Signing user CSR for [{username}], signas={signas}', extra=await self.getLogExtra(name=username, signas=signas)) pkey, cert = self.certdir.signUserCsr(xcsr, signas=signas) return self.certdir._certToByts(cert).decode()
[docs] async def getAhaUrls(self, user='root'): # for backward compat... urls = self.conf.get('aha:urls') if urls is not None: return urls network = self.conf.req('aha:network') urls = [] for server in await self.getAhaServers(): host = server.get('host') port = server.get('port') urls.append(f'ssl://{host}:{port}?certname={user}@{network}') return urls
[docs] def getMyUrl(self, user='root'): port = self.sockaddr[1] host = self._getDnsName() network = self.conf.req('aha:network') return f'ssl://{host}:{port}?certname={user}@{network}'
[docs] async def getAhaClone(self, iden): lkey = s_common.uhex(iden) byts = self.slab.get(lkey, db='aha:clones') if byts is not None: return s_msgpack.un(byts)
[docs] async def addAhaClone(self, host, port=27492, conf=None): if conf is None: conf = {} network = self.conf.req('aha:network') conf['mirror'] = self.getMyUrl() conf['dns:name'] = host conf['aha:network'] = network conf['dmon:listen'] = f'ssl://0.0.0.0:{port}?hostname={host}&ca={network}' iden = s_common.guid() clone = { 'iden': iden, 'host': host, 'port': port, 'conf': conf, } await self._push('aha:clone:add', clone) logger.info(f'Created AHA clone provisioning for {host} with iden {iden}', extra=await self.getLogExtra(iden=iden, name=host, netw=network)) return self._getProvClientUrl(iden)
@s_nexus.Pusher.onPush('aha:clone:add') async def _addAhaClone(self, clone): iden = clone.get('iden') lkey = s_common.uhex(iden) self.slab.put(lkey, s_msgpack.en(clone), db='aha:clones')
[docs] async def addAhaSvcProv(self, name, provinfo=None): if not name: raise s_exc.BadArg(mesg='Empty name values are not allowed for provisioning.') self._reqProvListen() if provinfo is None: provinfo = {} iden = s_common.guid() provinfo['iden'] = iden conf = provinfo.setdefault('conf', {}) netw = self.conf.req('aha:network') ahaadmin = self.conf.get('aha:admin') if ahaadmin is not None: # pragma: no cover conf.setdefault('aha:admin', ahaadmin) ahauser = conf.setdefault('aha:user', 'root') ahaurls = await self.getAhaUrls(user=ahauser) conf['aha:network'] = netw hostname = f'{name}.{netw}' if len(hostname) > 64: mesg = f'Hostname value must not exceed 64 characters in length. {hostname=}, len={len(hostname)}' raise s_exc.BadArg(mesg=mesg) conf.setdefault('aha:name', name) dmon_port = provinfo.get('dmon:port', 0) dmon_listen = f'ssl://0.0.0.0:{dmon_port}?hostname={hostname}&ca={netw}' conf.setdefault('dmon:listen', dmon_listen) https_port = provinfo.get('https:port', s_common.novalu) if https_port is not s_common.novalu: conf.setdefault('https:port', https_port) # if the relative name contains a dot, we are a mirror peer. peer = name.find('.') != -1 leader = name.rsplit('.', 1)[-1] if peer: conf.setdefault('aha:leader', leader) conf.setdefault('aha:registry', ahaurls) mirname = provinfo.get('mirror') if mirname is not None: conf['mirror'] = f'aha://{ahauser}@{mirname}...' user = await self.auth.getUserByName(ahauser) if user is None: user = await self.auth.addUser(ahauser) perms = [ ('aha', 'service', 'get', netw), ('aha', 'service', 'add', netw, name), ] if peer: perms.append(('aha', 'service', 'add', netw, leader)) for perm in perms: if user.allowed(perm): continue await user.allow(perm) iden = await self._push('aha:svc:prov:add', provinfo) logger.info(f'Created service provisioning for {name}.{netw} with iden {iden}', extra=await self.getLogExtra(iden=iden, name=name, netw=netw)) return self._getProvClientUrl(iden)
def _getProvClientUrl(self, iden): provlisn = self._getProvListen() provport = self.provaddr[1] provhost = self._getDnsName() urlinfo = s_telepath.chopurl(provlisn) host = urlinfo.get('hostname') scheme = urlinfo.get('scheme') if host is None: host = urlinfo.get('host') newinfo = { 'host': provhost, 'port': provport, 'scheme': scheme, 'path': '/' + iden, } if scheme == 'ssl': certhash = self.certdir.getHostCertHash(host) if certhash is not None: newinfo['certhash'] = certhash return s_telepath.zipurl(newinfo)
[docs] async def getAhaSvcProv(self, iden): byts = self.slab.get(iden.encode(), db='aha:provs') if byts is not None: return s_msgpack.un(byts)
@s_nexus.Pusher.onPush('aha:svc:prov:add') async def _addAhaSvcProv(self, provinfo): iden = provinfo.get('iden') self.slab.put(iden.encode(), s_msgpack.en(provinfo), db='aha:provs') return iden
[docs] @s_nexus.Pusher.onPushAuto('aha:svc:prov:clear') async def clearAhaSvcProvs(self): for iden, byts in self.slab.scanByFull(db='aha:provs'): self.slab.delete(iden, db='aha:provs') provinfo = s_msgpack.un(byts) logger.info(f'Deleted service provisioning service={provinfo.get("conf").get("aha:name")}, iden={iden.decode()}')
[docs] @s_nexus.Pusher.onPushAuto('aha:enroll:clear') async def clearAhaUserEnrolls(self): for iden, byts in self.slab.scanByFull(db='aha:enrolls'): self.slab.delete(iden, db='aha:enrolls') userinfo = s_msgpack.un(byts) logger.info(f'Deleted user enrollment username={userinfo.get("name")}, iden={iden.decode()}')
[docs] @s_nexus.Pusher.onPushAuto('aha:clone:clear') async def clearAhaClones(self): for lkey, byts in self.slab.scanByFull(db='aha:clones'): self.slab.delete(lkey, db='aha:clones') cloninfo = s_msgpack.un(byts) logger.info(f'Deleted AHA clone enrollment username={cloninfo.get("host")}, iden={s_common.ehex(lkey)}')
[docs] @s_nexus.Pusher.onPushAuto('aha:svc:prov:del') async def delAhaSvcProv(self, iden): self.slab.delete(iden.encode(), db='aha:provs')
[docs] async def addAhaUserEnroll(self, name, userinfo=None, again=False): if not name: raise s_exc.BadArg(mesg='Empty name values are not allowed for provisioning.') provurl = self._reqProvListen() ahanetw = self.conf.req('aha:network') username = f'{name}@{ahanetw}' if len(username) > 64: mesg = f'Username value must not exceed 64 characters in length. username={username}, len={len(username)}' raise s_exc.BadArg(mesg=mesg) user = await self.auth.getUserByName(username) if user is not None: if not again: mesg = f'User name ({name}) already exists. Need --again?' raise s_exc.DupUserName(mesg=mesg) if user is None: user = await self.auth.addUser(username) await user.allow(('aha', 'service', 'get', ahanetw)) userinfo = { 'name': name, 'iden': s_common.guid(), } iden = await self._push('aha:enroll:add', userinfo) logger.info(f'Created user provisioning for {name} with iden {iden}', extra=await self.getLogExtra(iden=iden, name=name)) return self._getProvClientUrl(iden)
[docs] async def getAhaUserEnroll(self, iden): byts = self.slab.get(iden.encode(), db='aha:enrolls') if byts is not None: return s_msgpack.un(byts)
@s_nexus.Pusher.onPush('aha:enroll:add') async def _addAhaUserEnroll(self, userinfo): iden = userinfo.get('iden') self.slab.put(iden.encode(), s_msgpack.en(userinfo), db='aha:enrolls') return iden
[docs] @s_nexus.Pusher.onPushAuto('aha:enroll:del') async def delAhaUserEnroll(self, iden): self.slab.delete(iden.encode(), db='aha:enrolls')