Source code for synapse.lib.stormsvc

import asyncio
import logging

import synapse.telepath as s_telepath

import synapse.lib.base as s_base
import synapse.lib.hashitem as s_hashitem

logger = logging.getLogger(__name__)

stormcmds = (
    {
        'name': 'service.add',
        'descr': 'Add a storm service to the cortex.',
        'cmdargs': (
            ('name', {'help': 'The name of the service.'}),
            ('url', {'help': 'The telepath URL for the remote service.'}),
        ),
        'cmdconf': {},
        'storm': '''
            $sdef = $lib.service.add($cmdopts.name, $cmdopts.url)
            $lib.print("added {iden} ({name}): {url}", iden=$sdef.iden, name=$sdef.name, url=$sdef.url)
        ''',
    },
    {
        'name': 'service.del',
        'descr': 'Remove a storm service from the cortex.',
        'cmdargs': (
            ('iden', {'help': 'The service identifier or prefix.'}),
        ),
        'cmdconf': {},
        'storm': '''
            $svcs = ()

            for $sdef in $lib.service.list() {
                if $sdef.iden.startswith($cmdopts.iden) {
                    $svcs.append($sdef)
                }
            }

            $count = $svcs.size()

            if $( $count = 1 ) {
                $sdef = $svcs.index(0)
                $lib.service.del($sdef.iden)
                $lib.print("removed {iden} ({name}): {url}", iden=$sdef.iden, name=$sdef.name, url=$sdef.url)
            } elif $( $count = 0 ) {
                $lib.print("No service found by iden: {iden}", iden=$cmdopts.iden)
            } else {
                $lib.print('Multiple matches found for {iden}.  Aborting delete.', iden=$cmdopts.iden)
            }
        ''',
    },
    {
        'name': 'service.list',
        'descr': 'List the storm services configured in the cortex.',
        'cmdconf': {},
        'storm': '''
            $lib.print("")
            $lib.print("Storm service list (iden, ready, name, service name, service version, url):")
            $count = $(0)
            for $sdef in $lib.service.list() {
                $url = $sdef.url
                $iden = $sdef.iden
                $name = $sdef.name
                $ready = $sdef.ready
                $sname = $sdef.svcname
                if $sname {} else { $sname = 'Unknown' }
                $svers = $sdef.svcvers
                if $svers {
                    $svers = $lib.str.join('.', $svers)
                } else {
                    $svers = 'Unknown'
                }
                $mesg="    {iden} {ready} ({name}) ({sname} @ {svers}): {url}"
                $lib.print(mesg=$mesg, iden=$iden, ready=$ready, name=$name, sname=$sname, svers=$svers, url=$url)
                $count = $( $count + 1 )
            }
            $lib.print("")
            $lib.print("{count} services", count=$count)
        ''',
    }
)

[docs] class StormSvc: ''' The StormSvc mixin class used to make a remote storm service with commands. ''' _storm_svc_name = 'noname' _storm_svc_vers = (0, 0, 1) _storm_svc_evts = {} # type: ignore _storm_svc_pkgs = () # type: ignore
[docs] async def getStormSvcInfo(self): # Users must specify the service name assert self._storm_svc_name != 'noname' return { 'name': self._storm_svc_name, 'vers': self._storm_svc_vers, 'evts': self._storm_svc_evts, 'pkgs': await self.getStormSvcPkgs(), }
[docs] async def getStormSvcPkgs(self): return self._storm_svc_pkgs
[docs] class StormSvcClient(s_base.Base): ''' A StormService is a wrapper for a telepath proxy to a service accessible from the storm runtime. ''' async def __anit__(self, core, sdef): await s_base.Base.__anit__(self) self.core = core self.sdef = sdef self.iden = sdef.get('iden') self.name = sdef.get('name') # Local name for the cortex self.svcname = '' # remote name from the service self.svcvers = '' # remote version from the service # service info from the server... self.info = None url = self.sdef.get('url') self.ready = asyncio.Event() self.proxy = await s_telepath.Client.anit(url, onlink=self._onTeleLink) self.onfini(self.proxy.fini) async def _runSvcInit(self): # Set the latest reference for this object to the remote svcname self.core.svcsbysvcname.pop(self.svcname, None) self.svcname = self.info['name'] self.svcvers = self.info['vers'] self.core.svcsbysvcname[self.svcname] = self await self.core.feedBeholder('svc:set', {'name': self.name, 'iden': self.iden, 'svcname': self.svcname, 'version': self.svcvers}) # if the old service is the same as the new service, just skip oldpkgs = self.core.getStormSvcPkgs(self.iden) byname = {} done = set() for pdef in oldpkgs: iden = s_hashitem.hashitem(pdef) byname[pdef.get('name')] = iden # Register new packages for pdef in self.info.get('pkgs', ()): try: pdef['svciden'] = self.iden await self.core._normStormPkg(pdef) except Exception: name = pdef.get('name') logger.exception(f'normStormPkg ({name}) failed for service {self.name} ({self.iden})') continue name = pdef.get('name') iden = s_hashitem.hashitem(pdef) done.add(name) if name in byname: if byname[name] != iden: await self.core._delStormPkg(name) # we're updating an old package, so delete the old and then re-add else: continue # pkg unchanged. Can skip. try: # push the svciden in the package metadata for later reference. await self.core._addStormPkg(pdef) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception(f'addStormPkg ({name}) failed for service {self.name} ({self.iden})') # clean up any packages that no longer exist for name in byname.keys(): if name not in done: await self.core._delStormPkg(name) # Set events and fire as needed evts = self.info.get('evts') try: if evts is not None: self.sdef = await self.core.setStormSvcEvents(self.iden, evts) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception(f'setStormSvcEvents failed for service {self.name} ({self.iden})') try: if self.core.isactive: await self.core._runStormSvcAdd(self.iden) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception(f'service.add storm hook failed for service {self.name} ({self.iden})') async def _onTeleLink(self, proxy): clss = proxy._getClasses() names = [c.rsplit('.', 1)[-1] for c in clss] if 'StormSvc' in names: self.info = await proxy.getStormSvcInfo() await self._runSvcInit() async def unready(): self.ready.clear() await self.core.fire("stormsvc:client:unready", iden=self.iden) proxy.onfini(unready) self.ready.set()