Source code for synapse.cortex

import os
import copy
import regex
import asyncio
import logging
import textwrap
import contextlib
import collections

from collections.abc import Mapping

import synapse
import synapse.exc as s_exc
import synapse.axon as s_axon
import synapse.common as s_common
import synapse.telepath as s_telepath
import synapse.datamodel as s_datamodel

import synapse.lib.base as s_base
import synapse.lib.cell as s_cell
import synapse.lib.chop as s_chop
import synapse.lib.coro as s_coro
import synapse.lib.hive as s_hive
import synapse.lib.view as s_view
import synapse.lib.cache as s_cache
import synapse.lib.const as s_const
import synapse.lib.layer as s_layer
import synapse.lib.nexus as s_nexus
import synapse.lib.oauth as s_oauth
import synapse.lib.queue as s_queue
import synapse.lib.scope as s_scope
import synapse.lib.storm as s_storm
import synapse.lib.agenda as s_agenda
import synapse.lib.config as s_config
import synapse.lib.parser as s_parser
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.grammar as s_grammar
import synapse.lib.httpapi as s_httpapi
import synapse.lib.msgpack as s_msgpack
import synapse.lib.modules as s_modules
import synapse.lib.schemas as s_schemas
import synapse.lib.spooled as s_spooled
import synapse.lib.version as s_version
import synapse.lib.urlhelp as s_urlhelp
import synapse.lib.hashitem as s_hashitem
import synapse.lib.jsonstor as s_jsonstor
import synapse.lib.modelrev as s_modelrev
import synapse.lib.stormsvc as s_stormsvc
import synapse.lib.lmdbslab as s_lmdbslab

import synapse.lib.crypto.rsa as s_rsa

# Importing these registers their commands
import synapse.lib.stormhttp as s_stormhttp  # NOQA
import synapse.lib.stormwhois as s_stormwhois  # NOQA

import synapse.lib.stormtypes as s_stormtypes

import synapse.lib.stormlib.aha as s_stormlib_aha  # NOQA
import synapse.lib.stormlib.env as s_stormlib_env  # NOQA
import synapse.lib.stormlib.gen as s_stormlib_gen  # NOQA
import synapse.lib.stormlib.gis as s_stormlib_gis  # NOQA
import synapse.lib.stormlib.hex as s_stormlib_hex  # NOQA
import synapse.lib.stormlib.log as s_stormlib_log  # NOQA
import synapse.lib.stormlib.xml as s_stormlib_xml  # NOQA
import synapse.lib.stormlib.auth as s_stormlib_auth  # NOQA
import synapse.lib.stormlib.cell as s_stormlib_cell  # NOQA
import synapse.lib.stormlib.imap as s_stormlib_imap  # NOQA
import synapse.lib.stormlib.ipv6 as s_stormlib_ipv6  # NOQA
import synapse.lib.stormlib.json as s_stormlib_json  # NOQA
import synapse.lib.stormlib.math as s_stormlib_math  # NOQA
import synapse.lib.stormlib.mime as s_stormlib_mime  # NOQA
import synapse.lib.stormlib.pack as s_stormlib_pack  # NOQA
import synapse.lib.stormlib.smtp as s_stormlib_smtp  # NOQA
import synapse.lib.stormlib.stix as s_stormlib_stix  # NOQA
import synapse.lib.stormlib.yaml as s_stormlib_yaml  # NOQA
import synapse.lib.stormlib.basex as s_stormlib_basex  # NOQA
import synapse.lib.stormlib.cache as s_stormlib_cache  # NOQA
import synapse.lib.stormlib.graph as s_stormlib_graph  # NOQA
import synapse.lib.stormlib.iters as s_stormlib_iters  # NOQA
import synapse.lib.stormlib.macro as s_stormlib_macro
import synapse.lib.stormlib.model as s_stormlib_model
import synapse.lib.stormlib.oauth as s_stormlib_oauth  # NOQA
import synapse.lib.stormlib.stats as s_stormlib_stats  # NOQA
import synapse.lib.stormlib.storm as s_stormlib_storm  # NOQA
import synapse.lib.stormlib.vault as s_stormlib_vault  # NOQA
import synapse.lib.stormlib.backup as s_stormlib_backup  # NOQA
import synapse.lib.stormlib.cortex as s_stormlib_cortex  # NOQA
import synapse.lib.stormlib.hashes as s_stormlib_hashes  # NOQA
import synapse.lib.stormlib.random as s_stormlib_random  # NOQA
import synapse.lib.stormlib.scrape as s_stormlib_scrape   # NOQA
import synapse.lib.stormlib.infosec as s_stormlib_infosec  # NOQA
import synapse.lib.stormlib.project as s_stormlib_project  # NOQA
import synapse.lib.stormlib.spooled as s_stormlib_spooled  # NOQA
import synapse.lib.stormlib.tabular as s_stormlib_tabular  # NOQA
import synapse.lib.stormlib.version as s_stormlib_version  # NOQA
import synapse.lib.stormlib.easyperm as s_stormlib_easyperm  # NOQA
import synapse.lib.stormlib.ethereum as s_stormlib_ethereum  # NOQA
import synapse.lib.stormlib.modelext as s_stormlib_modelext  # NOQA
import synapse.lib.stormlib.compression as s_stormlib_compression  # NOQA
import synapse.lib.stormlib.notifications as s_stormlib_notifications  # NOQA

logger = logging.getLogger(__name__)
stormlogger = logging.getLogger('synapse.storm')

'''
A Cortex implements the synapse hypergraph object.
'''

reqver = '>=0.2.0,<3.0.0'

# Constants returned in results from syncLayersEvents and syncIndexEvents
SYNC_NODEEDITS = 0  # A nodeedits: (<offs>, 0, <etyp>, (<etype args>), {<meta>})
SYNC_NODEEDIT = 1   # A nodeedit:  (<offs>, 0, <etyp>, (<etype args>))
SYNC_LAYR_ADD = 3   # A layer was added
SYNC_LAYR_DEL = 4   # A layer was deleted

MAX_NEXUS_DELTA = 3_600

reqValidTagModel = s_config.getJsValidator({
    'type': 'object',
    'properties': {
        'prune': {'type': 'number', 'minimum': 1},
        'regex': {'type': 'array', 'items': {'type': ['string', 'null']}},
    },
    'additionalProperties': False,
    'required': [],
})

reqValidStormMacro = s_config.getJsValidator({
    'type': 'object',
    'properties': {
        'name': {'type': 'string', 'pattern': '^.{1,491}$'},
        'iden': {'type': 'string', 'pattern': s_config.re_iden},
        # user kept for backward compat. remove eventually...
        'user': {'type': 'string', 'pattern': s_config.re_iden},
        'creator': {'type': 'string', 'pattern': s_config.re_iden},
        'desc': {'type': 'string', 'default': ''},
        'storm': {'type': 'string'},
        'created': {'type': 'number'},
        'updated': {'type': 'number'},
        'permissions': s_msgpack.deepcopy(s_schemas.easyPermSchema),
    },
    'required': [
        'name',
        'iden',
        'user',
        'storm',
        'creator',
        'created',
        'updated',
        'permissions',
    ],
})

[docs] def cmprkey_indx(x): return x[1]
[docs] def cmprkey_buid(x): return x[1][1]
[docs] async def wrap_liftgenr(iden, genr): async for indx, buid, sode in genr: yield iden, (indx, buid), sode
[docs] class CortexAxonMixin:
[docs] async def prepare(self): await self.cell.axready.wait() await s_coro.ornot(super().prepare)
[docs] def getAxon(self): return self.cell.axon
[docs] async def getAxonInfo(self): return self.cell.axoninfo
[docs] class CortexAxonHttpHasV1(CortexAxonMixin, s_axon.AxonHttpHasV1): pass
[docs] class CortexAxonHttpDelV1(CortexAxonMixin, s_axon.AxonHttpDelV1): pass
[docs] class CortexAxonHttpUploadV1(CortexAxonMixin, s_axon.AxonHttpUploadV1): pass
[docs] class CortexAxonHttpBySha256V1(CortexAxonMixin, s_axon.AxonHttpBySha256V1): pass
[docs] class CortexAxonHttpBySha256InvalidV1(CortexAxonMixin, s_axon.AxonHttpBySha256InvalidV1): pass
[docs] class CoreApi(s_cell.CellApi): ''' The CoreApi is exposed when connecting to a Cortex over Telepath. Many CoreApi methods operate on packed nodes consisting of primitive data structures which can be serialized with msgpack/json. An example of a packaged Node:: ( (<form>, <valu>), { "props": { <name>: <valu>, ... }, "tags": { "foo": <time>, "foo.bar": <time>, }, }) '''
[docs] @s_cell.adminapi() def getCoreMods(self): return self.cell.getCoreMods()
[docs] async def getModelDict(self): ''' Return a dictionary which describes the data model. Returns: (dict): A model description dictionary. ''' return await self.cell.getModelDict()
[docs] async def getModelDefs(self): return await self.cell.getModelDefs()
[docs] def getCoreInfo(self): ''' Return static generic information about the cortex including model definition ''' return self.cell.getCoreInfo()
[docs] async def getCoreInfoV2(self): ''' Return static generic information about the cortex including model definition ''' return await self.cell.getCoreInfoV2()
[docs] @s_cell.adminapi() async def saveLayerNodeEdits(self, layriden, edits, meta): return await self.cell.saveLayerNodeEdits(layriden, edits, meta)
def _reqValidStormOpts(self, opts): if opts is None: opts = {} opts.setdefault('user', self.user.iden) if opts.get('user') != self.user.iden: self.user.confirm(('impersonate',)) return opts
[docs] async def callStorm(self, text, opts=None): ''' Return the value expressed in a return() statement within storm. ''' opts = self._reqValidStormOpts(opts) return await self.cell.callStorm(text, opts=opts)
[docs] async def exportStorm(self, text, opts=None): ''' Execute a storm query and package nodes for export/import. NOTE: This API yields nodes after an initial complete lift in order to limit exported edges. ''' opts = self._reqValidStormOpts(opts) async for pode in self.cell.exportStorm(text, opts=opts): yield pode
[docs] async def feedFromAxon(self, sha256, opts=None): ''' Import a msgpack .nodes file from the axon. ''' opts = self._reqValidStormOpts(opts) return await self.cell.feedFromAxon(sha256, opts=opts)
async def _reqDefLayerAllowed(self, perms): view = self.cell.getView() wlyr = view.layers[0] self.user.confirm(perms, gateiden=wlyr.iden)
[docs] async def addNode(self, form, valu, props=None): ''' Deprecated in 2.0.0. ''' s_common.deprecated('CoreApi.addNode') async with await self.cell.snap(user=self.user) as snap: self.user.confirm(('node', 'add', form), gateiden=snap.wlyr.iden) node = await snap.addNode(form, valu, props=props) return node.pack()
[docs] async def addNodes(self, nodes): ''' Add a list of packed nodes to the cortex. Args: nodes (list): [ ( (form, valu), {'props':{}, 'tags':{}}), ... ] Yields: (tuple): Packed node tuples ((form,valu), {'props': {}, 'tags':{}}) Deprecated in 2.0.0 ''' s_common.deprecated('CoreApi.addNodes') # First check that that user may add each form done = {} for node in nodes: formname = node[0][0] if done.get(formname): continue await self._reqDefLayerAllowed(('node', 'add', formname)) done[formname] = True async with await self.cell.snap(user=self.user) as snap: snap.strict = False async for node in snap.addNodes(nodes): if node is not None: node = node.pack() yield node
[docs] async def getFeedFuncs(self): ''' Get a list of Cortex feed functions. Notes: Each feed dictionary has the name of the feed function, the full docstring for the feed function, and the first line of the docstring broken out in their own keys for easy use. Returns: tuple: A tuple of dictionaries. ''' return await self.cell.getFeedFuncs()
[docs] async def addFeedData(self, name, items, *, viewiden=None): view = self.cell.getView(viewiden, user=self.user) if view is None: raise s_exc.NoSuchView(mesg=f'No such view iden={viewiden}', iden=viewiden) wlyr = view.layers[0] parts = name.split('.') self.user.confirm(('feed:data', *parts), gateiden=wlyr.iden) await self.cell.boss.promote('feeddata', user=self.user, info={'name': name, 'view': view.iden, 'nitems': len(items), }) async with await self.cell.snap(user=self.user, view=view) as snap: snap.strict = False await snap.addFeedData(name, items)
[docs] async def count(self, text, opts=None): ''' Count the number of nodes which result from a storm query. Args: text (str): Storm query text. opts (dict): Storm query options. Returns: (int): The number of nodes resulting from the query. ''' opts = self._reqValidStormOpts(opts) return await self.cell.count(text, opts=opts)
[docs] async def storm(self, text, opts=None): ''' Evaluate a storm query and yield result messages. Yields: ((str,dict)): Storm messages. ''' opts = self._reqValidStormOpts(opts) async for mesg in self.cell.storm(text, opts=opts): yield mesg
[docs] async def reqValidStorm(self, text, opts=None): ''' Parse a Storm query to validate it. Args: text (str): The text of the Storm query to parse. opts (dict): A Storm options dictionary. Returns: True: If the query is valid. Raises: BadSyntaxError: If the query is invalid. ''' return await self.cell.reqValidStorm(text, opts)
[docs] async def syncLayerNodeEdits(self, offs, layriden=None, wait=True): ''' Yield (indx, mesg) nodeedit sets for the given layer beginning at offset. Once caught up, this API will begin yielding nodeedits in real-time. The generator will only terminate on network disconnect or if the consumer falls behind the max window size of 10,000 nodeedit messages. ''' layr = self.cell.getLayer(layriden) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {layriden}', iden=layriden) self.user.confirm(('sync',), gateiden=layr.iden) async for item in self.cell.syncLayerNodeEdits(layr.iden, offs, wait=wait): yield item
[docs] async def getPropNorm(self, prop, valu, typeopts=None): ''' Get the normalized property value based on the Cortex data model. Args: prop (str): The property to normalize. valu: The value to normalize. typeopts: A Synapse type opts dictionary used to further normalize the value. Returns: (tuple): A two item tuple, containing the normed value and the info dictionary. Raises: s_exc.NoSuchProp: If the prop does not exist. s_exc.BadTypeValu: If the value fails to normalize. ''' return await self.cell.getPropNorm(prop, valu, typeopts=typeopts)
[docs] async def getTypeNorm(self, name, valu, typeopts=None): ''' Get the normalized type value based on the Cortex data model. Args: name (str): The type to normalize. valu: The value to normalize. typeopts: A Synapse type opts dictionary used to further normalize the value. Returns: (tuple): A two item tuple, containing the normed value and the info dictionary. Raises: s_exc.NoSuchType: If the type does not exist. s_exc.BadTypeValu: If the value fails to normalize. ''' return await self.cell.getTypeNorm(name, valu, typeopts=typeopts)
[docs] async def addForm(self, formname, basetype, typeopts, typeinfo): ''' Add an extended form to the data model. Extended forms *must* begin with _ ''' self.user.confirm(('model', 'form', 'add', formname)) return await self.cell.addForm(formname, basetype, typeopts, typeinfo)
[docs] async def delForm(self, formname): ''' Remove an extended form from the data model. ''' self.user.confirm(('model', 'form', 'del', formname)) return await self.cell.delForm(formname)
[docs] async def addFormProp(self, form, prop, tdef, info): ''' Add an extended property to the given form. Extended properties *must* begin with _ ''' self.user.confirm(('model', 'prop', 'add', form)) if not s_grammar.isBasePropNoPivprop(prop): mesg = f'Invalid prop name {prop}' raise s_exc.BadPropDef(prop=prop, mesg=mesg) return await self.cell.addFormProp(form, prop, tdef, info)
[docs] async def delFormProp(self, form, name): ''' Remove an extended property from the given form. ''' self.user.confirm(('model', 'prop', 'del', form)) return await self.cell.delFormProp(form, name)
[docs] async def addUnivProp(self, name, tdef, info): ''' Add an extended universal property. Extended properties *must* begin with _ ''' self.user.confirm(('model', 'univ', 'add')) if not s_grammar.isBasePropNoPivprop(name): mesg = f'Invalid prop name {name}' raise s_exc.BadPropDef(name=name, mesg=mesg) return await self.cell.addUnivProp(name, tdef, info)
[docs] async def delUnivProp(self, name): ''' Remove an extended universal property. ''' self.user.confirm(('model', 'univ', 'del')) return await self.cell.delUnivProp(name)
[docs] async def addTagProp(self, name, tdef, info): ''' Add a tag property to record data about tags on nodes. ''' self.user.confirm(('model', 'tagprop', 'add')) if not s_grammar.isBasePropNoPivprop(name): mesg = f'Invalid prop name {name}' raise s_exc.BadPropDef(name=name, mesg=mesg) return await self.cell.addTagProp(name, tdef, info)
[docs] async def delTagProp(self, name): ''' Remove a previously added tag property. ''' self.user.confirm(('model', 'tagprop', 'del')) return await self.cell.delTagProp(name)
[docs] async def addStormPkg(self, pkgdef, verify=False): self.user.confirm(('pkg', 'add')) return await self.cell.addStormPkg(pkgdef, verify=verify)
[docs] async def delStormPkg(self, iden): self.user.confirm(('pkg', 'del')) return await self.cell.delStormPkg(iden)
[docs] @s_cell.adminapi() async def getStormPkgs(self): return await self.cell.getStormPkgs()
[docs] @s_cell.adminapi() async def getStormPkg(self, name): return await self.cell.getStormPkg(name)
[docs] @s_cell.adminapi() async def addStormDmon(self, ddef): return await self.cell.addStormDmon(ddef)
[docs] @s_cell.adminapi() async def getStormDmons(self): return await self.cell.getStormDmons()
[docs] @s_cell.adminapi() async def getStormDmonLog(self, iden): return await self.cell.getStormDmonLog(iden)
[docs] @s_cell.adminapi() async def getStormDmon(self, iden): return await self.cell.getStormDmon(iden)
[docs] @s_cell.adminapi() async def bumpStormDmon(self, iden): return await self.cell.bumpStormDmon(iden)
[docs] @s_cell.adminapi() async def disableStormDmon(self, iden): return await self.cell.disableStormDmon(iden)
[docs] @s_cell.adminapi() async def enableStormDmon(self, iden): return await self.cell.enableStormDmon(iden)
[docs] @s_cell.adminapi() async def delStormDmon(self, iden): return await self.cell.delStormDmon(iden)
[docs] @s_cell.adminapi() async def cloneLayer(self, iden, ldef=None): ldef = ldef or {} ldef['creator'] = self.user.iden return await self.cell.cloneLayer(iden, ldef)
[docs] async def getStormVar(self, name, default=None): self.user.confirm(('globals', 'get', name)) return await self.cell.getStormVar(name, default=default)
[docs] async def popStormVar(self, name, default=None): self.user.confirm(('globals', 'pop', name)) return await self.cell.popStormVar(name, default=default)
[docs] async def setStormVar(self, name, valu): self.user.confirm(('globals', 'set', name)) return await self.cell.setStormVar(name, valu)
[docs] async def syncLayersEvents(self, offsdict=None, wait=True): self.user.confirm(('sync',)) async for item in self.cell.syncLayersEvents(offsdict=offsdict, wait=wait): yield item
[docs] async def syncIndexEvents(self, matchdef, offsdict=None, wait=True): self.user.confirm(('sync',)) async for item in self.cell.syncIndexEvents(matchdef, offsdict=offsdict, wait=wait): yield item
[docs] async def iterFormRows(self, layriden, form, stortype=None, startvalu=None): ''' Yields buid, valu tuples of nodes of a single form, optionally (re)starting at startvalue Args: layriden (str): Iden of the layer to retrieve the nodes form(str): A form name stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop startvalu (Any): The value to start at. May only be not None if stortype is not None. Returns: AsyncIterator[Tuple(buid, valu)] ''' self.user.confirm(('layer', 'lift', layriden)) async for item in self.cell.iterFormRows(layriden, form, stortype=stortype, startvalu=startvalu): yield item
[docs] async def iterPropRows(self, layriden, form, prop, stortype=None, startvalu=None): ''' Yields buid, valu tuples of nodes with a particular secondary property, optionally (re)starting at startvalue Args: layriden (str): Iden of the layer to retrieve the nodes form(str): A form name. prop (str): A secondary property name. stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop startvalu (Any): The value to start at. May only be not None if stortype is not None. Returns: AsyncIterator[Tuple(buid, valu)] ''' self.user.confirm(('layer', 'lift', layriden)) async for item in self.cell.iterPropRows(layriden, form, prop, stortype=stortype, startvalu=startvalu): yield item
[docs] async def iterUnivRows(self, layriden, prop, stortype=None, startvalu=None): ''' Yields buid, valu tuples of nodes with a particular universal property, optionally (re)starting at startvalue Args: layriden (str): Iden of the layer to retrieve the nodes prop (str): A universal property name. stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop startvalu (Any): The value to start at. May only be not None if stortype is not None. Returns: AsyncIterator[Tuple(buid, valu)] ''' self.user.confirm(('layer', 'lift', layriden)) async for item in self.cell.iterUnivRows(layriden, prop, stortype=stortype, startvalu=startvalu): yield item
[docs] async def iterTagRows(self, layriden, tag, form=None, starttupl=None): ''' Yields (buid, (valu, form)) values that match a tag and optional form, optionally (re)starting at starttupl. Args: layriden (str): Iden of the layer to retrieve the nodes tag (str): the tag to match form (Optional[str]): if present, only yields buids of nodes that match the form. starttupl (Optional[Tuple[buid, form]]): if present, (re)starts the stream of values there. Returns: AsyncIterator[Tuple(buid, (valu, form))] Note: This yields (buid, (tagvalu, form)) instead of just buid, valu in order to allow resuming an interrupted call by feeding the last value retrieved into starttupl ''' self.user.confirm(('layer', 'lift', layriden)) async for item in self.cell.iterTagRows(layriden, tag, form=form, starttupl=starttupl): yield item
[docs] async def iterTagPropRows(self, layriden, tag, prop, form=None, stortype=None, startvalu=None): ''' Yields (buid, valu) that match a tag:prop, optionally (re)starting at startvalu. Args: layriden (str): Iden of the layer to retrieve the nodes tag (str): tag name prop (str): prop name form (Optional[str]): optional form name stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop startvalu (Any): The value to start at. May only be not None if stortype is not None. Returns: AsyncIterator[Tuple(buid, valu)] ''' self.user.confirm(('layer', 'lift', layriden)) async for item in self.cell.iterTagPropRows(layriden, tag, prop, form=form, stortype=stortype, startvalu=startvalu): yield item
[docs] async def getAxonUpload(self): self.user.confirm(('axon', 'upload')) await self.cell.axready.wait() upload = await self.cell.axon.upload() return await s_axon.UpLoadProxy.anit(self.link, upload)
[docs] async def getAxonBytes(self, sha256): self.user.confirm(('axon', 'get')) await self.cell.axready.wait() async for byts in self.cell.axon.get(s_common.uhex(sha256)): yield byts
[docs] @s_cell.adminapi() async def getUserNotif(self, indx): return await self.cell.getUserNotif(indx)
[docs] @s_cell.adminapi() async def delUserNotif(self, indx): return await self.cell.delUserNotif(indx)
[docs] @s_cell.adminapi() async def addUserNotif(self, useriden, mesgtype, mesgdata=None): return await self.cell.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)
[docs] @s_cell.adminapi() async def iterUserNotifs(self, useriden, size=None): async for item in self.cell.iterUserNotifs(useriden, size=size): yield item
[docs] @s_cell.adminapi() async def watchAllUserNotifs(self, offs=None): async for item in self.cell.watchAllUserNotifs(offs=offs): yield item
[docs] @s_cell.adminapi() async def getHttpExtApiByPath(self, path): return await self.cell.getHttpExtApiByPath(path)
[docs] class Cortex(s_oauth.OAuthMixin, s_cell.Cell): # type: ignore ''' A Cortex implements the synapse hypergraph. The bulk of the Cortex API lives on the Snap() object which can be obtained by calling Cortex.snap() in a with block. This allows callers to manage transaction boundaries explicitly and dramatically increases performance. ''' # For the cortex, nexslog:en defaults to True confbase = copy.deepcopy(s_cell.Cell.confbase) confbase['nexslog:en']['default'] = True # type: ignore confbase['mirror']['hidedocs'] = False # type: ignore confbase['mirror']['hidecmdl'] = False # type: ignore confdefs = { 'axon': { 'description': 'A telepath URL for a remote axon.', 'type': 'string' }, 'jsonstor': { 'description': 'A telepath URL for a remote jsonstor.', 'type': 'string' }, 'cron:enable': { 'default': True, 'description': 'Deprecated. This option no longer controls cron execution and will be removed in Synapse 3.0.', 'type': 'boolean' }, 'trigger:enable': { 'default': True, 'description': 'Deprecated. This option no longer controls trigger execution and will be removed in Synapse 3.0.', 'type': 'boolean' }, 'layer:lmdb:map_async': { 'default': True, 'description': 'Deprecated. This value is ignored.', 'type': 'boolean', 'hidecmdl': True, 'hideconf': True, }, 'layer:lmdb:max_replay_log': { 'default': 10000, 'description': 'Deprecated. This value is ignored.', 'type': 'integer', 'hidecmdl': True, 'hideconf': True, }, 'layers:lockmemory': { 'default': False, 'description': 'Should new layers lock memory for performance by default.', 'type': 'boolean' }, 'layers:logedits': { 'default': True, 'description': 'Whether nodeedits are logged in each layer.', 'type': 'boolean' }, 'provenance:en': { # TODO: Remove in 3.0.0 'default': False, 'description': 'This no longer does anything.', 'type': 'boolean', 'hideconf': True, }, 'max:nodes': { 'description': 'Maximum number of nodes which are allowed to be stored in a Cortex.', 'type': 'integer', 'minimum': 1, 'hidecmdl': True, }, 'modules': { 'default': [], 'description': 'A list of module classes to load.', 'type': 'array' }, 'storm:log': { 'default': False, 'description': 'Log storm queries via system logger.', 'type': 'boolean' }, 'storm:log:level': { 'default': 'INFO', 'description': 'Logging log level to emit storm logs at.', 'type': [ 'integer', 'string', ], }, 'storm:interface:search': { 'default': True, 'description': 'Enable Storm search interfaces for lookup mode.', 'type': 'boolean', }, 'storm:interface:scrape': { 'default': True, 'description': 'Enable Storm scrape interfaces when using $lib.scrape APIs.', 'type': 'boolean', }, 'http:proxy': { 'description': 'An aiohttp-socks compatible proxy URL to use storm HTTP API.', 'type': 'string', }, 'tls:ca:dir': { 'description': 'An optional directory of CAs which are added to the TLS CA chain for Storm HTTP API calls.', 'type': 'string', }, } cellapi = CoreApi viewapi = s_view.ViewApi layerapi = s_layer.LayerApi hiveapi = s_hive.HiveApi viewctor = s_view.View.anit layrctor = s_layer.Layer.anit # phase 2 - service storage
[docs] async def initServiceStorage(self): # NOTE: we may not make *any* nexus actions in this method self.macrodb = self.slab.initdb('storm:macros') self.httpextapidb = self.slab.initdb('http:ext:apis') if self.inaugural: self.cellinfo.set('cortex:version', s_version.version) corevers = self.cellinfo.get('cortex:version') s_version.reqVersion(corevers, reqver, exc=s_exc.BadStorageVersion, mesg='cortex version in storage is incompatible with running software') self.viewmeta = self.slab.initdb('view:meta') self.views = {} self.layers = {} self.viewsbylayer = collections.defaultdict(list) self.modules = {} self.feedfuncs = {} self.stormcmds = {} self.maxnodes = self.conf.get('max:nodes') self.nodecount = 0 self.migration = False self._migration_lock = asyncio.Lock() self.stormmods = {} # name: mdef self.stormpkgs = {} # name: pkgdef self.stormvars = None # type: s_lmdbslab.SafeKeyVal self.svcsbyiden = {} self.svcsbyname = {} self.svcsbysvcname = {} # remote name, not local name self._propSetHooks = {} self._runtLiftFuncs = {} self._runtPropSetFuncs = {} self._runtPropDelFuncs = {} self.tagvalid = s_cache.FixedCache(self._isTagValid, size=1000) self.tagprune = s_cache.FixedCache(self._getTagPrune, size=1000) self.querycache = s_cache.FixedCache(self._getStormQuery, size=10000) self.stormpool = None self.stormpoolurl = None self.stormpoolopts = None self.libroot = (None, {}, {}) self.stormlibs = [] self.bldgbuids = {} # buid -> (Node, Event) Nodes under construction self.axon = None # type: s_axon.AxonApi self.axready = asyncio.Event() self.axoninfo = {} self.view = None # The default/main view self._cortex_permdefs = [] self._initCorePerms() # Reset the storm:log:level from the config value to an int for internal use. self.conf['storm:log:level'] = s_common.normLogLevel(self.conf.get('storm:log:level')) self.stormlog = self.conf.get('storm:log') self.stormloglvl = self.conf.get('storm:log:level') # generic fini handler for the Cortex self.onfini(self._onCoreFini) self.cortexdata = self.slab.getSafeKeyVal('cortex') await self._initCoreInfo() self._initStormLibs() self._initFeedFuncs() self.modsbyiface = {} self.stormiface_search = self.conf.get('storm:interface:search') self.stormiface_scrape = self.conf.get('storm:interface:scrape') self._initCortexHttpApi() self._exthttpapis = {} # iden -> adef; relies on cpython ordered dictionary behavior. self._exthttpapiorder = b'exthttpapiorder' self._exthttpapicache = s_cache.FixedCache(self._getHttpExtApiByPath, size=1000) self._initCortexExtHttpApi() self.model = s_datamodel.Model(core=self) await self._bumpCellVers('cortex:extmodel', ( (1, self._migrateTaxonomyIface), ), nexs=False) await self._bumpCellVers('cortex:storage', ( (1, self._storUpdateMacros), (4, self._storCortexHiveMigration), ), nexs=False) # Perform module loading await self._loadCoreMods() await self._loadExtModel() await self._initStormCmds() # Initialize our storage and views await self._initCoreAxon() await self._initJsonStor() await self._initCoreLayers() await self._initCoreViews() self.onfini(self._finiStor) await self._initCoreQueues() self.addHealthFunc(self._cortexHealth) await self._initOAuthManager() self.stormdmondefs = self.cortexdata.getSubKeyVal('storm:dmons:') self.stormdmons = await s_storm.DmonManager.anit(self) self.onfini(self.stormdmons) self.agenda = await s_agenda.Agenda.anit(self) self.onfini(self.agenda) await self._initStormGraphs() await self._initRuntFuncs() self.tagmeta = self.cortexdata.getSubKeyVal('tagmeta:') self.cmddefs = self.cortexdata.getSubKeyVal('storm:cmds:') self.pkgdefs = self.cortexdata.getSubKeyVal('storm:packages:') self.svcdefs = self.cortexdata.getSubKeyVal('storm:services:') await self._initDeprLocks() await self._warnDeprLocks() # Finalize coremodule loading & give svchive a shot to load await self._initPureStormCmds() self.dynitems.update({ 'cron': self.agenda, 'cortex': self, 'multiqueue': self.multiqueue, }) # TODO - Remove this in 3.0.0 ag = await self.auth.addAuthGate('cortex', 'cortex') for useriden in ag.gateusers.keys(): user = self.auth.user(useriden) if user is None: continue mesg = f'User {useriden} ({user.name}) has a rule on the "cortex" authgate. This authgate is not used ' \ f'for permission checks and will be removed in Synapse v3.0.0.' logger.warning(mesg, extra=await self.getLogExtra(user=useriden, username=user.name)) for roleiden in ag.gateroles.keys(): role = self.auth.role(roleiden) if role is None: continue mesg = f'Role {roleiden} ({role.name}) has a rule on the "cortex" authgate. This authgate is not used ' \ f'for permission checks and will be removed in Synapse v3.0.0.' logger.warning(mesg, extra=await self.getLogExtra(role=roleiden, rolename=role.name)) self._initVaults()
async def _storCortexHiveMigration(self): logger.warning('migrating Cortex data out of hive') viewdefs = self.cortexdata.getSubKeyVal('view:info:') async with await self.hive.open(('cortex', 'views')) as viewnodes: for view_iden, node in viewnodes: viewdict = await node.dict() viewinfo = viewdict.pack() viewinfo.setdefault('iden', view_iden) viewdefs.set(view_iden, viewinfo) trigdict = self.cortexdata.getSubKeyVal(f'view:{view_iden}:trigger:') async with await node.open(('triggers',)) as trignodes: for iden, trig in trignodes: valu = trig.valu if valu.get('view', s_common.novalu) != view_iden: valu['view'] = view_iden trigdict.set(iden, valu) layrdefs = self.cortexdata.getSubKeyVal('layer:info:') async with await self.hive.open(('cortex', 'layers')) as layrnodes: for iden, node in layrnodes: layrdict = await node.dict() layrinfo = layrdict.pack() pushs = layrinfo.get('pushs', {}) if pushs: for pdef in pushs.values(): pdef.setdefault('chunk:size', s_const.layer_pdef_csize) pdef.setdefault('queue:size', s_const.layer_pdef_qsize) pulls = layrinfo.get('pulls', {}) if pulls: pulls = layrinfo.get('pulls', {}) for pdef in pulls.values(): pdef.setdefault('chunk:size', s_const.layer_pdef_csize) pdef.setdefault('queue:size', s_const.layer_pdef_qsize) layrdefs.set(iden, layrinfo) migrs = ( (('agenda', 'appts'), 'agenda:appt:'), (('cortex', 'tagmeta'), 'tagmeta:'), (('cortex', 'storm', 'cmds'), 'storm:cmds:'), (('cortex', 'storm', 'vars'), 'storm:vars:'), (('cortex', 'storm', 'dmons'), 'storm:dmons:'), (('cortex', 'storm', 'packages'), 'storm:packages:'), (('cortex', 'storm', 'services'), 'storm:services:'), (('cortex', 'model', 'forms'), 'model:forms:'), (('cortex', 'model', 'props'), 'model:props:'), (('cortex', 'model', 'univs'), 'model:univs:'), (('cortex', 'model', 'tagprops'), 'model:tagprops:'), (('cortex', 'model', 'deprlocks'), 'model:deprlocks:'), ) for hivepath, kvpref in migrs: subkv = self.cortexdata.getSubKeyVal(kvpref) async with await self.hive.open(hivepath) as hivenode: for name, node in hivenode: subkv.set(name, node.valu) logger.warning('...Cortex data migration complete!') async def _viewNomergeToProtected(self): for view in self.views.values(): nomerge = view.info.get('nomerge', False) await view.setViewInfo('protected', nomerge) await view.setViewInfo('nomerge', None) async def _storUpdateMacros(self): for name, node in await self.hive.open(('cortex', 'storm', 'macros')): try: info = { 'name': name, 'storm': node.valu.get('storm'), } user = node.valu.get('user') if user is not None: info['user'] = user created = node.valu.get('created') if created is not None: info['created'] = created edited = node.valu.get('edited') if edited is not None: info['updated'] = edited if info.get('created') is None: info['created'] = edited mdef = self._initStormMacro(info) await self._addStormMacro(mdef) except Exception as e: logger.exception(f'Macro migration error for macro: {name} (skipped).')
[docs] def getStormMacro(self, name, user=None): if not name: raise s_exc.BadArg(mesg=f'Macro names must be at least 1 character long') if len(name) > 491: raise s_exc.BadArg(mesg='Macro names may only be up to 491 chars.') byts = self.slab.get(name.encode(), db=self.macrodb) if byts is None: return None mdef = s_msgpack.un(byts) if user is not None: mesg = f'User requires read permission on macro: {name}.' self._reqEasyPerm(mdef, user, s_cell.PERM_READ, mesg=mesg) return mdef
[docs] def reqStormMacro(self, name, user=None): mdef = self.getStormMacro(name) if mdef is None: raise s_exc.NoSuchName(mesg=f'Macro name not found: {name}') if user is not None: mesg = f'User requires read permission on macro: {name}.' self._reqEasyPerm(mdef, user, s_cell.PERM_READ, mesg=mesg) return mdef
def _reqStormMacroPerm(self, user, name, level): mdef = self.reqStormMacro(name) mesg = f'User requires {s_cell.permnames.get(level)} permission on macro: {name}' if level == s_cell.PERM_EDIT and ( user.allowed(('storm', 'macro', 'edit')) or user.allowed(('storm', 'macro', 'admin'))): return mdef if level == s_cell.PERM_ADMIN and user.allowed(('storm', 'macro', 'admin')): return mdef self._reqEasyPerm(mdef, user, level, mesg=mesg) return mdef
[docs] async def addStormMacro(self, mdef, user=None): if user is None: user = self.auth.rootuser user.confirm(('storm', 'macro', 'add'), default=True) mdef = self._initStormMacro(mdef, user=user) reqValidStormMacro(mdef) return await self._push('storm:macro:add', mdef)
def _initStormMacro(self, mdef, user=None): if user is None: user = self.auth.rootuser mdef['iden'] = s_common.guid() now = s_common.now() mdef.setdefault('updated', now) mdef.setdefault('created', now) useriden = mdef.get('user', user.iden) mdef['user'] = useriden mdef['creator'] = useriden mdef.setdefault('storm', '') self._initEasyPerm(mdef) mdef['permissions']['users'][useriden] = s_cell.PERM_ADMIN return mdef @s_nexus.Pusher.onPush('storm:macro:add') async def _addStormMacro(self, mdef): name = mdef.get('name') reqValidStormMacro(mdef) # idempotency protection... oldv = self.getStormMacro(name) if oldv is not None and oldv.get('iden') != mdef.get('iden'): raise s_exc.BadArg(mesg=f'Duplicate macro name: {name}') self.slab.put(name.encode(), s_msgpack.en(mdef), db=self.macrodb) await self.feedBeholder('storm:macro:add', {'macro': mdef}) return mdef
[docs] async def delStormMacro(self, name, user=None): if user is not None: self._reqStormMacroPerm(user, name, s_cell.PERM_ADMIN) return await self._push('storm:macro:del', name)
@s_nexus.Pusher.onPush('storm:macro:del') async def _delStormMacro(self, name): if not name: raise s_exc.BadArg(mesg=f'Macro names must be at least 1 character long') byts = self.slab.pop(name.encode(), db=self.macrodb) if byts is not None: macro = s_msgpack.un(byts) await self.feedBeholder('storm:macro:del', {'name': name, 'iden': macro.get('iden')}) return macro
[docs] async def modStormMacro(self, name, info, user=None): if user is not None: self._reqStormMacroPerm(user, name, s_cell.PERM_EDIT) return await self._push('storm:macro:mod', name, info)
@s_nexus.Pusher.onPush('storm:macro:mod') async def _modStormMacro(self, name, info): mdef = self.getStormMacro(name) if mdef is None: return mdef.update(info) reqValidStormMacro(mdef) newname = info.get('name') if newname is not None and newname != name: byts = self.slab.get(newname.encode(), db=self.macrodb) if byts is not None: raise s_exc.DupName('A macro named {newname} already exists!') self.slab.put(newname.encode(), s_msgpack.en(mdef), db=self.macrodb) self.slab.pop(name.encode(), db=self.macrodb) else: self.slab.put(name.encode(), s_msgpack.en(mdef), db=self.macrodb) await self.feedBeholder('storm:macro:mod', {'macro': mdef, 'info': info}) return mdef
[docs] async def setStormMacroPerm(self, name, scope, iden, level, user=None): if user is not None: self._reqStormMacroPerm(user, name, s_cell.PERM_ADMIN) return await self._push('storm:macro:set:perm', name, scope, iden, level)
@s_nexus.Pusher.onPush('storm:macro:set:perm') async def _setStormMacroPerm(self, name, scope, iden, level): mdef = self.reqStormMacro(name) await self._setEasyPerm(mdef, scope, iden, level) reqValidStormMacro(mdef) self.slab.put(name.encode(), s_msgpack.en(mdef), db=self.macrodb) info = { 'scope': scope, 'iden': iden, 'level': level } await self.feedBeholder('storm:macro:set:perm', {'macro': mdef, 'info': info}) return mdef
[docs] async def getStormMacros(self, user=None): retn = [] for lkey, byts in self.slab.scanByFull(db=self.macrodb): await asyncio.sleep(0) mdef = s_msgpack.un(byts) if user is not None and not self._hasEasyPerm(mdef, user, s_cell.PERM_READ): continue retn.append(mdef) return retn
[docs] async def getStormIfaces(self, name): mods = self.modsbyiface.get(name) if mods is not None: return mods mods = [] for moddef in self.stormmods.values(): ifaces = moddef.get('interfaces') if ifaces is None: continue if name not in ifaces: continue mods.append(moddef) self.modsbyiface[name] = tuple(mods) return mods
def _initCorePerms(self): self._cortex_permdefs.extend(( {'perm': ('model', 'form', 'add'), 'gate': 'cortex', 'desc': 'Controls access to adding extended model forms.'}, {'perm': ('model', 'form', 'add', '<form>'), 'gate': 'cortex', 'desc': 'Controls access to adding specific extended model forms.', 'ex': 'model.form.add._foo:bar'}, {'perm': ('model', 'form', 'del'), 'gate': 'cortex', 'desc': 'Controls access to deleting extended model forms.'}, {'perm': ('model', 'form', 'del', '<form>'), 'gate': 'cortex', 'desc': 'Controls access to deleting specific extended model forms.', 'ex': 'model.form.del._foo:bar'}, {'perm': ('model', 'type', 'add'), 'gate': 'cortex', 'desc': 'Controls access to adding extended model types.'}, {'perm': ('model', 'type', 'add', '<type>'), 'gate': 'cortex', 'desc': 'Controls access to adding specific extended model types.', 'ex': 'model.type.add._foo:bar'}, {'perm': ('model', 'type', 'del'), 'gate': 'cortex', 'desc': 'Controls access to deleting extended model types.'}, {'perm': ('model', 'type', 'del', '<type>'), 'gate': 'cortex', 'desc': 'Controls access to deleting specific extended model types.', 'ex': 'model.type.del._foo:bar'}, {'perm': ('model', 'prop', 'add'), 'gate': 'cortex', 'desc': 'Controls access to adding extended model properties.'}, {'perm': ('model', 'prop', 'add', '<form>'), 'gate': 'cortex', 'desc': 'Controls access to adding specific extended model properties.', 'ex': 'model.prop.add._foo:bar'}, {'perm': ('model', 'prop', 'del'), 'gate': 'cortex', 'desc': 'Controls access to deleting extended model properties and values.'}, {'perm': ('model', 'prop', 'del', '<form>'), 'gate': 'cortex', 'desc': 'Controls access to deleting specific extended model properties and values.', 'ex': 'model.prop.del._foo:bar'}, {'perm': ('model', 'tagprop', 'add'), 'gate': 'cortex', 'desc': 'Controls access to adding extended model tag properties and values.'}, {'perm': ('model', 'tagprop', 'del'), 'gate': 'cortex', 'desc': 'Controls access to deleting extended model tag properties and values.'}, {'perm': ('model', 'univ', 'add'), 'gate': 'cortex', 'desc': 'Controls access to adding extended model universal properties.'}, {'perm': ('model', 'univ', 'del'), 'gate': 'cortex', 'desc': 'Controls access to deleting extended model universal properties and values.'}, {'perm': ('node',), 'gate': 'layer', 'desc': 'Controls all node edits in a layer.'}, {'perm': ('node', 'add'), 'gate': 'layer', 'desc': 'Controls adding any form of node in a layer.'}, {'perm': ('node', 'del'), 'gate': 'layer', 'desc': 'Controls removing any form of node in a layer.'}, {'perm': ('node', 'add', '<form>'), 'gate': 'layer', 'ex': 'node.add.inet:ipv4', 'desc': 'Controls adding a specific form of node in a layer.'}, {'perm': ('node', 'del', '<form>'), 'gate': 'layer', 'desc': 'Controls removing a specific form of node in a layer.'}, {'perm': ('node', 'tag'), 'gate': 'layer', 'desc': 'Controls editing any tag on any node in a layer.'}, {'perm': ('node', 'tag', 'add'), 'gate': 'layer', 'desc': 'Controls adding any tag on any node in a layer.'}, {'perm': ('node', 'tag', 'del'), 'gate': 'layer', 'desc': 'Controls removing any tag on any node in a layer.'}, {'perm': ('node', 'tag', 'add', '<tag...>'), 'gate': 'layer', 'ex': 'node.tag.add.cno.mal.redtree', 'desc': 'Controls adding a specific tag on any node in a layer.'}, {'perm': ('node', 'tag', 'del', '<tag...>'), 'gate': 'layer', 'ex': 'node.tag.del.cno.mal.redtree', 'desc': 'Controls removing a specific tag on any node in a layer.'}, {'perm': ('node', 'prop'), 'gate': 'layer', 'desc': 'Controls editing any prop on any node in the layer.'}, {'perm': ('node', 'prop', 'set'), 'gate': 'layer', 'desc': 'Controls setting any prop on any node in a layer.'}, {'perm': ('node', 'prop', 'set', '<form>'), 'gate': 'layer', 'ex': 'node.prop.set.inet:ipv4', 'desc': 'Controls setting any property on a form of node in a layer.'}, {'perm': ('node', 'prop', 'set', '<form>', '<prop>'), 'gate': 'layer', 'ex': 'node.prop.set.inet:ipv4.asn', 'desc': 'Controls setting a specific property on a form of node in a layer.'}, {'perm': ('node', 'prop', 'del'), 'gate': 'layer', 'desc': 'Controls removing any prop on any node in a layer.'}, {'perm': ('node', 'prop', 'del', '<form>'), 'gate': 'layer', 'ex': 'node.prop.del.inet:ipv4', 'desc': 'Controls removing any property from a form of node in a layer.'}, {'perm': ('node', 'prop', 'del', '<form>', '<prop>'), 'gate': 'layer', 'ex': 'node.prop.del.inet:ipv4.asn', 'desc': 'Controls removing a specific property from a form of node in a layer.'}, {'perm': ('node', 'data', 'set'), 'gate': 'layer', 'desc': 'Permits a user to set node data in a given layer.'}, {'perm': ('node', 'data', 'set', '<key>'), 'gate': 'layer', 'ex': 'node.data.set.hehe', 'desc': 'Permits a user to set node data in a given layer for a specific key.'}, {'perm': ('node', 'data', 'pop'), 'gate': 'layer', 'desc': 'Permits a user to remove node data in a given layer.'}, {'perm': ('node', 'data', 'pop', '<key>'), 'gate': 'layer', 'ex': 'node.data.pop.hehe', 'desc': 'Permits a user to remove node data in a given layer for a specific key.'}, {'perm': ('pkg', 'add'), 'gate': 'cortex', 'desc': 'Controls access to adding storm packages.'}, {'perm': ('pkg', 'del'), 'gate': 'cortex', 'desc': 'Controls access to deleting storm packages.'}, {'perm': ('storm', 'asroot', 'cmd', '<cmdname>'), 'gate': 'cortex', 'desc': 'Controls running storm commands requiring root privileges.', 'ex': 'storm.asroot.cmd.movetag'}, {'perm': ('storm', 'asroot', 'mod', '<modname>'), 'gate': 'cortex', 'desc': 'Controls importing modules requiring root privileges.', 'ex': 'storm.asroot.cmd.synapse-misp.privsep'}, {'perm': ('storm', 'graph', 'add'), 'gate': 'cortex', 'desc': 'Controls access to add a storm graph.', 'default': True}, {'perm': ('storm', 'macro', 'add'), 'gate': 'cortex', 'desc': 'Controls access to add a storm macro.', 'default': True}, {'perm': ('storm', 'macro', 'admin'), 'gate': 'cortex', 'desc': 'Controls access to edit/set/delete a storm macro.'}, {'perm': ('storm', 'macro', 'edit'), 'gate': 'cortex', 'desc': 'Controls access to edit a storm macro.'}, {'perm': ('task', 'get'), 'gate': 'cortex', 'desc': 'Controls access to view other users tasks.'}, {'perm': ('task', 'del'), 'gate': 'cortex', 'desc': 'Controls access to terminate other users tasks.'}, {'perm': ('view',), 'gate': 'cortex', 'desc': 'Controls all view permissions.'}, {'perm': ('view', 'add'), 'gate': 'cortex', 'desc': 'Controls access to add a new view including forks.'}, {'perm': ('view', 'del'), 'gate': 'view', 'desc': 'Controls access to delete a view.'}, {'perm': ('view', 'fork'), 'gate': 'view', 'default': True, 'desc': 'Controls access to fork a view.'}, {'perm': ('view', 'read'), 'gate': 'view', 'desc': 'Controls read access to view.'}, {'perm': ('view', 'set', '<setting>'), 'gate': 'view', 'desc': 'Controls access to change view settings.', 'ex': 'view.set.name'}, {'perm': ('axon', 'upload'), 'gate': 'cortex', 'desc': 'Controls the ability to upload a file to the Axon.'}, {'perm': ('axon', 'get'), 'gate': 'cortex', 'desc': 'Controls the ability to retrieve a file from the Axon.'}, {'perm': ('axon', 'has'), 'gate': 'cortex', 'desc': 'Controls the ability to check if the Axon contains a file.'}, {'perm': ('axon', 'del'), 'gate': 'cortex', 'desc': 'Controls the ability to remove a file from the Axon.'}, )) for pdef in self._cortex_permdefs: s_storm.reqValidPermDef(pdef) def _getPermDefs(self): permdefs = list(s_cell.Cell._getPermDefs(self)) permdefs.extend(self._cortex_permdefs) for spkg in self._getStormPkgs(): permdefs.extend(spkg.get('perms', ())) for (path, ctor) in self.stormlibs: permdefs.extend(ctor._storm_lib_perms) permdefs.sort(key=lambda x: x['perm']) return tuple(permdefs) def _setPropSetHook(self, name, hook): self._propSetHooks[name] = hook async def _callPropSetHook(self, node, prop, norm): hook = self._propSetHooks.get(prop.full) if hook is None: return await hook(node, prop, norm) async def _execCellUpdates(self): await self._bumpCellVers('cortex:defaults', ( (1, self._addAllLayrRead), (2, self._viewNomergeToProtected), )) async def _addAllLayrRead(self): layriden = self.getView().layers[0].iden role = await self.auth.getRoleByName('all') await role.addRule((True, ('layer', 'read')), gateiden=layriden)
[docs] async def initServiceRuntime(self): # do any post-nexus initialization here... if self.isactive: await self._checkNexsIndx() await self._initCoreMods() if self.isactive: await self._checkLayerModels() self.addActiveCoro(self.agenda.runloop) await self._initStormDmons() await self._initStormSvcs() # share ourself via the cell dmon as "cortex" # for potential default remote use self.dmon.share('cortex', self)
[docs] async def initServiceActive(self): await self.stormdmons.start() await self.agenda.clearRunningStatus() async def _runMigrations(): # Run migrations when this cortex becomes active. This is to prevent # migrations getting skipped in a zero-downtime upgrade path # (upgrade mirror, promote mirror). await self._checkLayerModels() # Once migrations are complete, start the view and layer tasks. for view in self.views.values(): await view.initTrigTask() await view.initMergeTask() for layer in self.layers.values(): await layer.initLayerActive() self.runActiveTask(_runMigrations()) await self.initStormPool()
[docs] async def initServicePassive(self): await self.stormdmons.stop() for view in self.views.values(): await view.finiTrigTask() await view.finiMergeTask() for layer in self.layers.values(): await layer.initLayerPassive() await self.finiStormPool()
[docs] async def initStormPool(self): try: byts = self.slab.get(b'storm:pool', db='cell:conf') if byts is None: return url, opts = s_msgpack.un(byts) self.stormpoolurl = url self.stormpoolopts = opts async def onlink(proxy, urlinfo): _url = s_urlhelp.sanitizeUrl(s_telepath.zipurl(urlinfo)) logger.debug(f'Stormpool client connected to {_url}') self.stormpool = await s_telepath.open(url, onlink=onlink) # make this one a fini weakref vs the fini() handler self.onfini(self.stormpool) except Exception as e: # pragma: no cover logger.exception(f'Error starting stormpool, it will not be available: {e}')
[docs] async def finiStormPool(self): if self.stormpool is not None: await self.stormpool.fini() self.stormpool = None
[docs] async def getStormPool(self): byts = self.slab.get(b'storm:pool', db='cell:conf') if byts is None: return None return s_msgpack.un(byts)
[docs] @s_nexus.Pusher.onPushAuto('storm:pool:set') async def setStormPool(self, url, opts): s_schemas.reqValidStormPoolOpts(opts) info = (url, opts) self.slab.put(b'storm:pool', s_msgpack.en(info), db='cell:conf') if self.isactive: await self.finiStormPool() await self.initStormPool()
[docs] @s_nexus.Pusher.onPushAuto('storm:pool:del') async def delStormPool(self): self.slab.pop(b'storm:pool', db='cell:conf') if self.isactive: await self.finiStormPool()
[docs] @s_nexus.Pusher.onPushAuto('model:lock:prop') async def setPropLocked(self, name, locked): prop = self.model.reqProp(name) self.modellocks.set(f'prop/{name}', locked) prop.locked = locked
[docs] @s_nexus.Pusher.onPushAuto('model:lock:univ') async def setUnivLocked(self, name, locked): prop = self.model.reqUniv(name) self.modellocks.set(f'univ/{name}', locked) for prop in self.model.getAllUnivs(name): prop.locked = locked
[docs] @s_nexus.Pusher.onPushAuto('model:lock:tagprop') async def setTagPropLocked(self, name, locked): prop = self.model.reqTagProp(name) self.modellocks.set(f'tagprop/{name}', locked) prop.locked = locked
[docs] @s_nexus.Pusher.onPushAuto('model:depr:lock') async def setDeprLock(self, name, locked): todo = [] prop = self.model.prop(name) if prop is not None and prop.deprecated: todo.append(prop) _type = self.model.type(name) if _type is not None and _type.deprecated: todo.append(_type) if not todo: mesg = 'setDeprLock() called on non-existant or non-deprecated form, property, or type.' raise s_exc.NoSuchProp(name=name, mesg=mesg) self.deprlocks.set(name, locked) for elem in todo: elem.locked = locked
[docs] async def getDeprLocks(self): ''' Return a dictionary of deprecated properties and their lock status. ''' retn = {} for prop in self.model.props.values(): if not prop.deprecated: continue # Skip universal properties on other props if not prop.isform and prop.univ is not None: continue retn[prop.full] = prop.locked return retn
async def _warnDeprLocks(self): # Check for deprecated properties which are unused and unlocked deprs = await self.getDeprLocks() count = 0 for propname, locked in deprs.items(): if locked: continue prop = self.model.props.get(propname) for layr in self.layers.values(): if not prop.isform and prop.isuniv: if await layr.getUnivPropCount(prop.name, maxsize=1): break else: if await layr.getPropCount(propname, maxsize=1): break if await layr.getPropCount(prop.form.name, prop.name, maxsize=1): break else: count += 1 if count: mesg = f'Detected {count} deprecated properties unlocked and not in use, ' mesg += 'recommend locking (https://v.vtx.lk/deprlock).' logger.warning(mesg)
[docs] async def reqValidStormGraph(self, gdef): for filt in gdef.get('filters', ()): await self.getStormQuery(filt) for pivo in gdef.get('filters', ()): await self.getStormQuery(pivo) for form, rule in gdef.get('forms', {}).items(): if form != '*' and self.model.form(form) is None: raise s_exc.NoSuchForm.init(form) for filt in rule.get('filters', ()): await self.getStormQuery(filt) for pivo in rule.get('filters', ()): await self.getStormQuery(pivo)
[docs] async def addStormGraph(self, gdef, user=None): if user is None: user = self.auth.rootuser user.confirm(('storm', 'graph', 'add'), default=True) self._initEasyPerm(gdef) now = s_common.now() gdef['iden'] = s_common.guid() gdef['scope'] = 'user' gdef['creator'] = user.iden gdef['created'] = now gdef['updated'] = now gdef['permissions']['users'][user.iden] = s_cell.PERM_ADMIN s_stormlib_graph.reqValidGdef(gdef) await self.reqValidStormGraph(gdef) return await self._push('storm:graph:add', gdef)
@s_nexus.Pusher.onPush('storm:graph:add') async def _addStormGraph(self, gdef): s_stormlib_graph.reqValidGdef(gdef) await self.reqValidStormGraph(gdef) if gdef['scope'] == 'power-up': mesg = 'Power-up graph projections may only be added by power-ups.' raise s_exc.SynErr(mesg=mesg) iden = gdef['iden'] if self.graphs.get(iden) is not None: return self.graphs.set(iden, gdef) await self.feedBeholder('storm:graph:add', {'gdef': gdef}) return copy.deepcopy(gdef) def _reqStormGraphPerm(self, user, iden, level): gdef = self.graphs.get(iden) if gdef is None: gdef = self.pkggraphs.get(iden) if gdef is None: mesg = f'No graph projection with iden {iden} exists!' raise s_exc.NoSuchIden(mesg=mesg) if gdef['scope'] == 'power-up' and level > s_cell.PERM_READ: mesg = 'Power-up graph projections may not be modified.' raise s_exc.AuthDeny(mesg=mesg, user=user.iden, username=user.name) if user is not None: mesg = f'User requires {s_cell.permnames.get(level)} permission on graph: {iden}.' self._reqEasyPerm(gdef, user, level, mesg=mesg) return gdef
[docs] async def delStormGraph(self, iden, user=None): self._reqStormGraphPerm(user, iden, s_cell.PERM_ADMIN) return await self._push('storm:graph:del', iden)
@s_nexus.Pusher.onPush('storm:graph:del') async def _delStormGraph(self, iden): gdef = self.graphs.pop(iden, None) if gdef is not None: await self.feedBeholder('storm:graph:del', {'iden': iden}) return gdef
[docs] async def getStormGraph(self, iden, user=None): gdef = self._reqStormGraphPerm(user, iden, s_cell.PERM_READ) return copy.deepcopy(gdef)
[docs] async def getStormGraphs(self, user=None): for _, gdef in self.graphs.items(): await asyncio.sleep(0) if user is not None and self._hasEasyPerm(gdef, user, s_cell.PERM_READ): yield copy.deepcopy(gdef) for gdef in self.pkggraphs.values(): await asyncio.sleep(0) if user is not None and self._hasEasyPerm(gdef, user, s_cell.PERM_READ): yield copy.deepcopy(gdef)
[docs] async def modStormGraph(self, iden, info, user=None): self._reqStormGraphPerm(user, iden, s_cell.PERM_EDIT) info['updated'] = s_common.now() return await self._push('storm:graph:mod', iden, info)
@s_nexus.Pusher.onPush('storm:graph:mod') async def _modStormGraph(self, iden, info): gdef = self._reqStormGraphPerm(None, iden, s_cell.PERM_EDIT) gdef = copy.deepcopy(gdef) gdef.update(info) s_stormlib_graph.reqValidGdef(gdef) await self.reqValidStormGraph(gdef) self.graphs.set(iden, gdef) await self.feedBeholder('storm:graph:mod', {'gdef': gdef}) return copy.deepcopy(gdef)
[docs] async def setStormGraphPerm(self, gden, scope, iden, level, user=None): self._reqStormGraphPerm(user, gden, s_cell.PERM_ADMIN) return await self._push('storm:graph:set:perm', gden, scope, iden, level, s_common.now())
@s_nexus.Pusher.onPush('storm:graph:set:perm') async def _setStormGraphPerm(self, gden, scope, iden, level, utime): gdef = self._reqStormGraphPerm(None, gden, s_cell.PERM_ADMIN) gdef = copy.deepcopy(gdef) gdef['updated'] = utime await self._setEasyPerm(gdef, scope, iden, level) s_stormlib_graph.reqValidGdef(gdef) self.graphs.set(gden, gdef) await self.feedBeholder('storm:graph:set:perm', {'gdef': gdef}) return copy.deepcopy(gdef)
[docs] async def addCoreQueue(self, name, info): if self.multiqueue.exists(name): mesg = f'Queue named {name} already exists!' raise s_exc.DupName(mesg=mesg) await self._push('queue:add', name, info)
@s_nexus.Pusher.onPush('queue:add') async def _addCoreQueue(self, name, info): if self.multiqueue.exists(name): return await self.auth.addAuthGate(f'queue:{name}', 'queue') creator = info.get('creator') if creator is not None: user = await self.auth.reqUser(creator) await user.setAdmin(True, gateiden=f'queue:{name}', logged=False) await self.multiqueue.add(name, info)
[docs] async def listCoreQueues(self): return self.multiqueue.list()
[docs] async def getCoreQueue(self, name): return self.multiqueue.status(name)
[docs] async def delCoreQueue(self, name): if not self.multiqueue.exists(name): mesg = f'No queue named {name} exists!' raise s_exc.NoSuchName(mesg=mesg) await self._push('queue:del', name) await self.auth.delAuthGate(f'queue:{name}')
@s_nexus.Pusher.onPush('queue:del') async def _delCoreQueue(self, name): if not self.multiqueue.exists(name): return await self.multiqueue.rem(name)
[docs] async def coreQueueGet(self, name, offs=0, cull=True, wait=False): if offs and cull: await self.coreQueueCull(name, offs - 1) async for item in self.multiqueue.gets(name, offs, cull=False, wait=wait): return item
[docs] async def coreQueueGets(self, name, offs=0, cull=True, wait=False, size=None): if offs and cull: await self.coreQueueCull(name, offs - 1) count = 0 async for item in self.multiqueue.gets(name, offs, cull=False, wait=wait): yield item count += 1 if size is not None and count >= size: return
[docs] async def coreQueuePuts(self, name, items): return await self._push('queue:puts', name, items)
@s_nexus.Pusher.onPush('queue:puts', passitem=True) async def _coreQueuePuts(self, name, items, nexsitem): nexsoff, nexsmesg = nexsitem return await self.multiqueue.puts(name, items, reqid=nexsoff)
[docs] @s_nexus.Pusher.onPushAuto('queue:cull') async def coreQueueCull(self, name, offs): await self.multiqueue.cull(name, offs)
[docs] @s_nexus.Pusher.onPushAuto('queue:pop') async def coreQueuePop(self, name, offs): return await self.multiqueue.pop(name, offs)
[docs] async def coreQueueSize(self, name): return self.multiqueue.size(name)
[docs] @s_nexus.Pusher.onPushAuto('tag:model:set') async def setTagModel(self, tagname, name, valu): ''' Set a model specification property for a tag. Arguments: tagname (str): The name of the tag. name (str): The name of the property. valu (object): The value of the property. Tag Model Properties: regex - A list of None or regular expression strings to match each tag level. prune - A number that determines how many levels of pruning are desired. Examples: await core.setTagModel("cno.cve", "regex", (None, None, "[0-9]{4}", "[0-9]{5}")) ''' meta = self.tagmeta.get(tagname) if meta is None: meta = {} meta[name] = valu reqValidTagModel(meta) self.tagmeta.set(tagname, meta) # clear cached entries if name == 'regex': self.tagvalid.clear() elif name == 'prune': self.tagprune.clear()
[docs] @s_nexus.Pusher.onPushAuto('tag:model:del') async def delTagModel(self, tagname): ''' Delete all the model specification properties for a tag. Arguments: tagname (str): The name of the tag. ''' self.tagmeta.pop(tagname) self.tagvalid.clear() self.tagprune.clear()
[docs] @s_nexus.Pusher.onPushAuto('tag:model:pop') async def popTagModel(self, tagname, name): ''' Pop a property from the model specification of a tag. Arguments: tagname (str): The name of the tag. name (str): The name of the specification property. Returns: (object): The current value of the property. ''' meta = self.tagmeta.get(tagname) if meta is None: return None retn = meta.pop(name, None) self.tagmeta.set(tagname, meta) if name == 'regex': self.tagvalid.clear() elif name == 'prune': self.tagprune.clear() return retn
[docs] def isTagValid(self, tagname): ''' Check if a tag name is valid according to tag model regular expressions. Returns: (bool): True if the tag is valid. ''' return self.tagvalid.get(tagname)
def _isTagValid(self, tagname): parts = s_chop.tagpath(tagname) for tag in s_chop.tags(tagname): meta = self.tagmeta.get(tag) if meta is None: continue regx = meta.get('regex') if regx is None: continue for i in range(min(len(regx), len(parts))): if regx[i] is None: continue if not regex.fullmatch(regx[i], parts[i]): mesg = f'Tag part ({parts[i]}) of tag ({tagname}) does not match the tag model regex: [{regx[i]}]' return (False, mesg) return (True, None)
[docs] async def getTagPrune(self, tagname): return self.tagprune.get(tagname)
def _getTagPrune(self, tagname): prune = [] pruning = 0 for tag in s_chop.tags(tagname): if pruning: pruning -= 1 prune.append(tag) continue meta = self.tagmeta.get(tag) if meta is None: continue pruning = meta.get('prune', 0) if pruning: pruning -= 1 prune.append(tag) # if we dont reach the final tag for pruning, skip it. if prune and not prune[-1] == tagname: return () return tuple(prune)
[docs] async def getTagModel(self, tagname): ''' Retrieve the tag model specification for a tag. Returns: (dict): The tag model specification or None. ''' retn = self.tagmeta.get(tagname) if retn is not None: return dict(retn)
[docs] async def listTagModel(self): ''' Retrieve a list of the tag model specifications. Returns: ([(str, dict), ...]): A list of tag model specification tuples. ''' return list(self.tagmeta.items())
async def _finiStor(self): await asyncio.gather(*[view.fini() for view in self.views.values()]) await asyncio.gather(*[layr.fini() for layr in self.layers.values()]) async def _initRuntFuncs(self): async def onSetTrigDoc(node, prop, valu): valu = str(valu) iden = node.ndef[1] node.snap.user.confirm(('trigger', 'set', 'doc'), gateiden=iden) await node.snap.view.setTriggerInfo(iden, 'doc', valu) node.props[prop.name] = valu async def onSetTrigName(node, prop, valu): valu = str(valu) iden = node.ndef[1] node.snap.user.confirm(('trigger', 'set', 'name'), gateiden=iden) await node.snap.view.setTriggerInfo(iden, 'name', valu) node.props[prop.name] = valu async def onSetCronDoc(node, prop, valu): valu = str(valu) iden = node.ndef[1] node.snap.user.confirm(('cron', 'set', 'doc'), gateiden=iden) await self.editCronJob(iden, 'doc', valu) node.props[prop.name] = valu async def onSetCronName(node, prop, valu): valu = str(valu) iden = node.ndef[1] node.snap.user.confirm(('cron', 'set', 'name'), gateiden=iden) await self.editCronJob(iden, 'name', valu) node.props[prop.name] = valu self.addRuntPropSet('syn:cron:doc', onSetCronDoc) self.addRuntPropSet('syn:cron:name', onSetCronName) self.addRuntPropSet('syn:trigger:doc', onSetTrigDoc) self.addRuntPropSet('syn:trigger:name', onSetTrigName) async def _initStormDmons(self): for iden, ddef in self.stormdmondefs.items(): try: await self.runStormDmon(iden, ddef) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception as e: logger.warning(f'initStormDmon ({iden}) failed: {e}') async def _initStormSvcs(self): for iden, sdef in self.svcdefs.items(): try: await self._setStormSvc(sdef) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception as e: logger.warning(f'initStormService ({iden}) failed: {e}') async def _initCoreQueues(self): path = os.path.join(self.dirn, 'slabs', 'queues.lmdb') slab = await s_lmdbslab.Slab.anit(path) self.onfini(slab.fini) self.multiqueue = await slab.getMultiQueue('cortex:queue', nexsroot=self.nexsroot) async def _initStormGraphs(self): path = os.path.join(self.dirn, 'slabs', 'graphs.lmdb') slab = await s_lmdbslab.Slab.anit(path) self.onfini(slab.fini) self.pkggraphs = {} self.graphs = s_lmdbslab.SlabDict(slab, db=slab.initdb('graphs'))
[docs] async def setStormCmd(self, cdef): await self._reqStormCmd(cdef) return await self._push('cmd:set', cdef)
@s_nexus.Pusher.onPush('cmd:set') async def _onSetStormCmd(self, cdef): ''' Set pure storm command definition. Args: cdef (dict): A Pure Stormcmd definition dictionary. Notes: The definition dictionary is formatted like the following:: { 'name': <name>, 'cmdargs': [ (<name>, <opts>), ] 'cmdconf': { <str>: <valu> }, 'storm': <text>, } ''' name = cdef.get('name') await self._setStormCmd(cdef) self.cmddefs.set(name, cdef) async def _reqStormCmd(self, cdef): name = cdef.get('name') if not s_grammar.isCmdName(name): raise s_exc.BadCmdName(name=name) await self.getStormQuery(cdef.get('storm')) async def _getStorNodes(self, buid, layers): # NOTE: This API lives here to make it easy to optimize # the cluster case to minimize round trips return [await layr.getStorNode(buid) for layr in layers] async def _genSodeList(self, buid, sodes, layers, filtercmpr=None): sodelist = [] if filtercmpr is not None: filt = True for layr in layers[-1::-1]: sode = sodes.get(layr.iden) if sode is None: sode = await layr.getStorNode(buid) if filt and filtercmpr(sode): return else: filt = False sodelist.append((layr.iden, sode)) return (buid, sodelist[::-1]) for layr in layers: sode = sodes.get(layr.iden) if sode is None: sode = await layr.getStorNode(buid) sodelist.append((layr.iden, sode)) return (buid, sodelist) async def _mergeSodes(self, layers, genrs, cmprkey, filtercmpr=None, reverse=False): lastbuid = None sodes = {} async for layr, (_, buid), sode in s_common.merggenr2(genrs, cmprkey, reverse=reverse): if not buid == lastbuid or layr in sodes: if lastbuid is not None: sodelist = await self._genSodeList(lastbuid, sodes, layers, filtercmpr) if sodelist is not None: yield sodelist sodes.clear() lastbuid = buid sodes[layr] = sode if lastbuid is not None: sodelist = await self._genSodeList(lastbuid, sodes, layers, filtercmpr) if sodelist is not None: yield sodelist async def _liftByDataName(self, name, layers): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByDataName(name): yield (buid, [(layr, sode)]) return genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByDataName(name))) async for sodes in self._mergeSodes(layers, genrs, cmprkey_buid): yield sodes async def _liftByProp(self, form, prop, layers, reverse=False): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByProp(form, prop, reverse=reverse): yield (buid, [(layr, sode)]) return genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByProp(form, prop, reverse=reverse))) def filtercmpr(sode): if (props := sode.get('props')) is None: return False return props.get(prop) is not None async for sodes in self._mergeSodes(layers, genrs, cmprkey_indx, filtercmpr, reverse=reverse): yield sodes async def _liftByPropValu(self, form, prop, cmprvals, layers, reverse=False): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByPropValu(form, prop, cmprvals, reverse=reverse): yield (buid, [(layr, sode)]) return def filtercmpr(sode): props = sode.get('props') if props is None: return False return props.get(prop) is not None for cval in cmprvals: genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByPropValu(form, prop, (cval,), reverse=reverse))) async for sodes in self._mergeSodes(layers, genrs, cmprkey_indx, filtercmpr, reverse=reverse): yield sodes async def _liftByPropArray(self, form, prop, cmprvals, layers, reverse=False): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByPropArray(form, prop, cmprvals, reverse=reverse): yield (buid, [(layr, sode)]) return if prop is None: filtercmpr = None else: def filtercmpr(sode): props = sode.get('props') if props is None: return False return props.get(prop) is not None for cval in cmprvals: genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByPropArray(form, prop, (cval,), reverse=reverse))) async for sodes in self._mergeSodes(layers, genrs, cmprkey_indx, filtercmpr, reverse=reverse): yield sodes async def _liftByFormValu(self, form, cmprvals, layers, reverse=False): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByFormValu(form, cmprvals, reverse=reverse): yield (buid, [(layr, sode)]) return for cval in cmprvals: genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByFormValu(form, (cval,), reverse=reverse))) async for sodes in self._mergeSodes(layers, genrs, cmprkey_indx, reverse=reverse): yield sodes async def _liftByTag(self, tag, form, layers, reverse=False): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByTag(tag, form, reverse=reverse): yield (buid, [(layr, sode)]) return if form is None: def filtercmpr(sode): tags = sode.get('tags') if tags is None: return False return tags.get(tag) is not None else: filtercmpr = None genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByTag(tag, form, reverse=reverse))) async for sodes in self._mergeSodes(layers, genrs, cmprkey_buid, filtercmpr, reverse=reverse): yield sodes async def _liftByTagValu(self, tag, cmpr, valu, form, layers, reverse=False): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByTagValu(tag, cmpr, valu, form, reverse=reverse): yield (buid, [(layr, sode)]) return def filtercmpr(sode): tags = sode.get('tags') if tags is None: return False return tags.get(tag) is not None genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByTagValu(tag, cmpr, valu, form, reverse=reverse))) async for sodes in self._mergeSodes(layers, genrs, cmprkey_buid, filtercmpr, reverse=reverse): yield sodes async def _liftByTagProp(self, form, tag, prop, layers, reverse=False): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByTagProp(form, tag, prop, reverse=reverse): yield (buid, [(layr, sode)]) return genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByTagProp(form, tag, prop, reverse=reverse))) def filtercmpr(sode): if (tagprops := sode.get('tagprops')) is None: return False if (props := tagprops.get(tag)) is None: return False return props.get(prop) is not None async for sodes in self._mergeSodes(layers, genrs, cmprkey_indx, filtercmpr, reverse=reverse): yield sodes async def _liftByTagPropValu(self, form, tag, prop, cmprvals, layers, reverse=False): if len(layers) == 1: layr = layers[0].iden async for _, buid, sode in layers[0].liftByTagPropValu(form, tag, prop, cmprvals, reverse=reverse): yield (buid, [(layr, sode)]) return def filtercmpr(sode): tagprops = sode.get('tagprops') if tagprops is None: return False props = tagprops.get(tag) if not props: return False return props.get(prop) is not None for cval in cmprvals: genrs = [] for layr in layers: genrs.append(wrap_liftgenr(layr.iden, layr.liftByTagPropValu(form, tag, prop, (cval,), reverse=reverse))) async for sodes in self._mergeSodes(layers, genrs, cmprkey_indx, filtercmpr, reverse=reverse): yield sodes async def _setStormCmd(self, cdef): ''' Note: No change control or persistence ''' def ctor(runt, runtsafe): return s_storm.PureCmd(cdef, runt, runtsafe) # TODO unify class ctors and func ctors vs briefs... def getCmdBrief(): return cdef.get('descr', 'No description').strip().split('\n')[0] ctor.getCmdBrief = getCmdBrief ctor.pkgname = cdef.get('pkgname') ctor.svciden = cdef.get('cmdconf', {}).get('svciden', '') ctor.forms = cdef.get('forms', {}) def getStorNode(form): ndef = (form.name, form.type.norm(cdef.get('name'))[0]) buid = s_common.buid(ndef) props = { 'doc': ctor.getCmdBrief() } inpt = ctor.forms.get('input') outp = ctor.forms.get('output') nodedata = ctor.forms.get('nodedata') if inpt: props['input'] = tuple(inpt) if outp: props['output'] = tuple(outp) if nodedata: props['nodedata'] = tuple(nodedata) if ctor.svciden: props['svciden'] = ctor.svciden if ctor.pkgname: props['package'] = ctor.pkgname pnorms = {} for prop, valu in props.items(): formprop = form.props.get(prop) if formprop is not None and valu is not None: pnorms[prop] = formprop.type.norm(valu)[0] return (buid, { 'ndef': ndef, 'props': pnorms, }) ctor.getStorNode = getStorNode name = cdef.get('name') self.stormcmds[name] = ctor await self.fire('core:cmd:change', cmd=name, act='add') async def _popStormCmd(self, name): self.stormcmds.pop(name, None) await self.fire('core:cmd:change', cmd=name, act='del')
[docs] async def delStormCmd(self, name): ''' Remove a previously set pure storm command. ''' ctor = self.stormcmds.get(name) if ctor is None: mesg = f'No storm command named {name}.' raise s_exc.NoSuchCmd(name=name, mesg=mesg) return await self._push('cmd:del', name)
@s_nexus.Pusher.onPush('cmd:del') async def _delStormCmd(self, name): ctor = self.stormcmds.get(name) if ctor is None: return cdef = self.cmddefs.get(name) if cdef is None: mesg = f'The storm command ({name}) is not dynamic.' raise s_exc.CantDelCmd(mesg=mesg) self.cmddefs.pop(name) self.stormcmds.pop(name, None) await self.fire('core:cmd:change', cmd=name, act='del')
[docs] async def addStormPkg(self, pkgdef, verify=False): ''' Add the given storm package to the cortex. This will store the package for future use. ''' # do validation before nexs... if verify: pkgcopy = s_msgpack.deepcopy(pkgdef) codesign = pkgcopy.pop('codesign', None) if codesign is None: mesg = 'Storm package is not signed!' raise s_exc.BadPkgDef(mesg=mesg) certbyts = codesign.get('cert') if certbyts is None: mesg = 'Storm package has no certificate!' raise s_exc.BadPkgDef(mesg=mesg) signbyts = codesign.get('sign') if signbyts is None: mesg = 'Storm package has no signature!' raise s_exc.BadPkgDef(mesg=mesg) try: cert = self.certdir.loadCertByts(certbyts.encode('utf-8')) except s_exc.BadCertBytes as e: raise s_exc.BadPkgDef(mesg='Storm package has malformed certificate!') from None try: self.certdir.valCodeCert(certbyts.encode()) except s_exc.BadCertVerify as e: mesg = e.get('mesg') if mesg: mesg = f'Storm package has invalid certificate: {mesg}' else: mesg = 'Storm package has invalid certificate!' raise s_exc.BadPkgDef(mesg=mesg) from None pubk = s_rsa.PubKey(cert.public_key()) if not pubk.verifyitem(pkgcopy, s_common.uhex(signbyts)): mesg = 'Storm package signature does not match!' raise s_exc.BadPkgDef(mesg=mesg) await self._normStormPkg(pkgdef) return await self._push('pkg:add', pkgdef)
@s_nexus.Pusher.onPush('pkg:add') async def _addStormPkg(self, pkgdef): name = pkgdef.get('name') olddef = self.pkgdefs.get(name, None) if olddef is not None: if s_hashitem.hashitem(pkgdef) != s_hashitem.hashitem(olddef): await self._dropStormPkg(olddef) else: return await self.loadStormPkg(pkgdef) self.pkgdefs.set(name, pkgdef) self._clearPermDefs() gates = [] perms = [] pkgperms = pkgdef.get('perms') if pkgperms: gates = [p['gate'] for p in pkgperms if p.get('gate') is not None] perms = [p['perm'] for p in pkgperms if p.get('perm') is not None] await self.feedBeholder('pkg:add', pkgdef, gates=gates, perms=perms)
[docs] async def delStormPkg(self, name): pkgdef = self.pkgdefs.get(name) if pkgdef is None: mesg = f'No storm package: {name}.' raise s_exc.NoSuchPkg(mesg=mesg) return await self._push('pkg:del', name)
@s_nexus.Pusher.onPush('pkg:del') async def _delStormPkg(self, name): ''' Delete a storm package by name. ''' pkgdef = self.pkgdefs.pop(name, None) if pkgdef is None: return await self._dropStormPkg(pkgdef) self._clearPermDefs() gates = [] perms = [] pkgperms = pkgdef.get('perms') if pkgperms: gates = [p['gate'] for p in pkgperms if p.get('gate') is not None] perms = [p['perm'] for p in pkgperms if p.get('perm') is not None] await self.feedBeholder('pkg:del', {'name': name}, gates=gates, perms=perms)
[docs] async def getStormPkg(self, name): return copy.deepcopy(self.stormpkgs.get(name))
[docs] async def getStormPkgs(self): return self._getStormPkgs()
def _getStormPkgs(self): return copy.deepcopy(list(self.pkgdefs.values()))
[docs] async def getStormMods(self): return copy.deepcopy(self.stormmods)
[docs] async def getStormMod(self, name, reqvers=None): mdef = copy.deepcopy(self.stormmods.get(name)) if mdef is None or reqvers is None: return mdef pkgvers = mdef.get('pkgvers') if pkgvers is None: mesg = f'getStormMod: requested storm module {name}@{reqvers}' \ 'has no version information to check.' logger.warning(mesg) return if isinstance(pkgvers, tuple): pkgvers = '%d.%d.%d' % pkgvers if s_version.matches(pkgvers, reqvers): return mdef
[docs] def getDataModel(self): return self.model
async def _tryLoadStormPkg(self, pkgdef): try: await self._normStormPkg(pkgdef, validstorm=False) await self.loadStormPkg(pkgdef) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception as e: name = pkgdef.get('name', '') logger.exception(f'Error loading pkg: {name}, {str(e)}')
[docs] async def verifyStormPkgDeps(self, pkgdef): result = { 'requires': [], 'conflicts': [], } deps = pkgdef.get('depends') if deps is None: return result requires = deps.get('requires', ()) for require in requires: pkgname = require.get('name') cmprvers = require.get('version') item = require.copy() item.setdefault('desc', None) cpkg = await self.getStormPkg(pkgname) if cpkg is None: item.update({'ok': False, 'actual': None}) else: cver = cpkg.get('version') ok = s_version.matches(cver, cmprvers) item.update({'ok': ok, 'actual': cver}) result['requires'].append(item) conflicts = deps.get('conflicts', ()) for conflict in conflicts: pkgname = conflict.get('name') cmprvers = conflict.get('version') item = conflict.copy() item.setdefault('version', None) item.setdefault('desc', None) cpkg = await self.getStormPkg(pkgname) if cpkg is None: item.update({'ok': True, 'actual': None}) else: cver = cpkg.get('version') ok = cmprvers is not None and not s_version.matches(cver, cmprvers) item.update({'ok': ok, 'actual': cver}) result['conflicts'].append(item) return result
async def _reqStormPkgDeps(self, pkgdef): name = pkgdef.get('name') deps = await self.verifyStormPkgDeps(pkgdef) for require in deps['requires']: if require['ok']: continue option = ' ' if require.get('optional'): option = ' optional ' mesg = f'Storm package {name}{option}requirement {require.get("name")}{require.get("version")} is currently unmet.' logger.debug(mesg) for conflict in deps['conflicts']: if conflict['ok']: continue mesg = f'Storm package {name} conflicts with {conflict.get("name")}{conflict.get("version") or ""}.' raise s_exc.StormPkgConflicts(mesg=mesg) def _reqStormPkgVarType(self, pkgname, vartype): if isinstance(vartype, (tuple, list)): for vtyp in vartype: self._reqStormPkgVarType(pkgname, vtyp) else: if vartype not in self.model.types: mesg = f'Storm package {pkgname} has unknown config var type {vartype}.' raise s_exc.NoSuchType(mesg=mesg, type=vartype) async def _normStormPkg(self, pkgdef, validstorm=True): ''' Normalize and validate a storm package (optionally storm code). ''' version = pkgdef.get('version') if isinstance(version, (tuple, list)): pkgdef['version'] = '%d.%d.%d' % tuple(version) await self._reqStormPkgDeps(pkgdef) pkgname = pkgdef.get('name') # Check synapse version requirement reqversion = pkgdef.get('synapse_version') if reqversion is not None: mesg = f'Storm package {pkgname} requires Synapse {reqversion} but ' \ f'Cortex is running {s_version.version}' s_version.reqVersion(s_version.version, reqversion, mesg=mesg) elif (minversion := pkgdef.get('synapse_minversion')) is not None: # This is for older packages that might not have the # `synapse_version` field. # TODO: Remove this whole else block after Synapse 3.0.0. if tuple(minversion) > s_version.version: mesg = f'Storm package {pkgname} requires Synapse {minversion} but ' \ f'Cortex is running {s_version.version}' raise s_exc.BadVersion(mesg=mesg) # Validate storm contents from modules and commands mods = pkgdef.get('modules', ()) cmds = pkgdef.get('commands', ()) onload = pkgdef.get('onload') svciden = pkgdef.get('svciden') if onload is not None and validstorm: await self.getStormQuery(onload) for mdef in mods: mdef.setdefault('modconf', {}) if svciden: mdef['modconf']['svciden'] = svciden if validstorm: modtext = mdef.get('storm') await self.getStormQuery(modtext) for cdef in cmds: cdef['pkgname'] = pkgname cdef.setdefault('cmdconf', {}) if svciden: cdef['cmdconf']['svciden'] = svciden if validstorm: cmdtext = cdef.get('storm') await self.getStormQuery(cmdtext) for gdef in pkgdef.get('graphs', ()): gdef['iden'] = s_common.guid((pkgname, gdef.get('name'))) gdef['scope'] = 'power-up' gdef['power-up'] = pkgname if validstorm: await self.reqValidStormGraph(gdef) # Validate package def (post normalization) s_storm.reqValidPkgdef(pkgdef) for configvar in pkgdef.get('configvars', ()): self._reqStormPkgVarType(pkgname, configvar.get('type'))
[docs] async def loadStormPkg(self, pkgdef): ''' Load a storm package into the storm library for this cortex. NOTE: This will *not* persist the package (allowing service dynamism). ''' self.modsbyiface.clear() name = pkgdef.get('name') mods = pkgdef.get('modules', ()) cmds = pkgdef.get('commands', ()) # now actually load... self.stormpkgs[name] = pkgdef pkgvers = pkgdef.get('version') # copy the mods dict and smash the ref so # updates are atomic and dont effect running # storm queries. stormmods = self.stormmods.copy() for mdef in mods: mdef = mdef.copy() modname = mdef.get('name') mdef['pkgvers'] = pkgvers stormmods[modname] = mdef self.stormmods = stormmods for cdef in cmds: await self._setStormCmd(cdef) for gdef in pkgdef.get('graphs', ()): gdef = copy.deepcopy(gdef) self._initEasyPerm(gdef) self.pkggraphs[gdef['iden']] = gdef onload = pkgdef.get('onload') if onload is not None and self.isactive: async def _onload(): try: async for mesg in self.storm(onload): if mesg[0] == 'print': logger.info(f'{name} onload output: {mesg[1].get("mesg")}') if mesg[0] == 'warn': logger.warning(f'{name} onload output: {mesg[1].get("mesg")}') if mesg[0] == 'err': logger.error(f'{name} onload output: {mesg[1]}') await asyncio.sleep(0) except asyncio.CancelledError: # pragma: no cover raise except Exception: # pragma: no cover logger.warning(f'onload failed for package: {name}') await self.fire('core:pkg:onload:complete', pkg=name) self.schedCoro(_onload())
async def _dropStormPkg(self, pkgdef): ''' Reverse the process of loadStormPkg() ''' self.modsbyiface.clear() for mdef in pkgdef.get('modules', ()): modname = mdef.get('name') self.stormmods.pop(modname, None) for cdef in pkgdef.get('commands', ()): name = cdef.get('name') await self._popStormCmd(name) pkgname = pkgdef.get('name') for gdef in pkgdef.get('graphs', ()): self.pkggraphs.pop(gdef['iden'], None) self.stormpkgs.pop(pkgname, None)
[docs] def getStormSvc(self, name): ssvc = self.svcsbyiden.get(name) if ssvc is not None: return ssvc ssvc = self.svcsbyname.get(name) if ssvc is not None: return ssvc ssvc = self.svcsbysvcname.get(name) if name is not None: return ssvc
[docs] async def waitStormSvc(self, name, timeout=None): ssvc = self.getStormSvc(name) return await s_coro.event_wait(ssvc.ready, timeout=timeout)
[docs] async def addStormSvc(self, sdef): ''' Add a registered storm service to the cortex. ''' iden = sdef.get('iden') if iden is None: iden = sdef['iden'] = s_common.guid() if self.svcsbyiden.get(iden) is not None: mesg = f'Storm service already exists: {iden}' raise s_exc.DupStormSvc(mesg=mesg) return await self._push('svc:add', sdef)
@s_nexus.Pusher.onPush('svc:add') async def _addStormSvc(self, sdef): iden = sdef.get('iden') ssvc = self.svcsbyiden.get(iden) if ssvc is not None: return ssvc.sdef ssvc = await self._setStormSvc(sdef) self.svcdefs.set(iden, sdef) await self.feedBeholder('svc:add', {'name': sdef.get('name'), 'iden': iden}) return ssvc.sdef
[docs] async def delStormSvc(self, iden): sdef = self.svcdefs.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' raise s_exc.NoSuchStormSvc(mesg=mesg, iden=iden) return await self._push('svc:del', iden)
@s_nexus.Pusher.onPush('svc:del') async def _delStormSvc(self, iden): ''' Delete a registered storm service from the cortex. ''' sdef = self.svcdefs.get(iden) if sdef is None: # pragma: no cover return try: if self.isactive: await self.runStormSvcEvent(iden, 'del') except asyncio.CancelledError: # pragma: no cover TODO: remove once py 3.8 only raise except Exception as e: logger.exception(f'service.del hook for service {iden} failed with error: {e}') sdef = self.svcdefs.pop(iden) await self._delStormSvcPkgs(iden) name = sdef.get('name') if name is not None: self.svcsbyname.pop(name, None) ssvc = self.svcsbyiden.pop(iden, None) if ssvc is not None: self.svcsbysvcname.pop(ssvc.svcname, None) await ssvc.fini() await self.feedBeholder('svc:del', {'iden': iden, 'name': name, 'svcname': ssvc.svcname}) async def _delStormSvcPkgs(self, iden): ''' Delete storm packages associated with a service. ''' for pkg in self.getStormSvcPkgs(iden): name = pkg.get('name') if name: await self._delStormPkg(name)
[docs] def getStormSvcPkgs(self, iden): pkgs = [] for _, pdef in self.pkgdefs.items(): pkgiden = pdef.get('svciden') if pkgiden and pkgiden == iden: pkgs.append(pdef) return pkgs
[docs] async def setStormSvcEvents(self, iden, edef): ''' Set the event callbacks for a storm service. Extends the sdef dict. Args: iden (str): The service iden. edef (dict): The events definition. Notes: The edef is formatted like the following:: { <name> : { 'storm': <storm> } } where ``name`` is one of the following items: add Run the given storm '*before* the service is first added (a la service.add), but not on a reconnect. del Run the given storm *after* the service is removed (a la service.del), but not on a disconnect. Returns: dict: An updated storm service definition dictionary. ''' sdef = self.svcdefs.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' raise s_exc.NoSuchStormSvc(mesg=mesg) sdef['evts'] = edef self.svcdefs.set(iden, sdef) return sdef
async def _runStormSvcAdd(self, iden): sdef = self.svcdefs.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' raise s_exc.NoSuchStormSvc(mesg=mesg) if sdef.get('added', False): return try: await self.runStormSvcEvent(iden, 'add') except asyncio.CancelledError: # pragma: no cover TODO: remove once py 3.8 only raise except Exception as e: logger.exception(f'runStormSvcEvent service.add failed with error {e}') return sdef['added'] = True self.svcdefs.set(iden, sdef)
[docs] async def runStormSvcEvent(self, iden, name): assert name in ('add', 'del') sdef = self.svcdefs.get(iden) if sdef is None: mesg = f'No storm service with iden: {iden}' raise s_exc.NoSuchStormSvc(mesg=mesg) evnt = sdef.get('evts', {}).get(name, {}).get('storm') if evnt is None: return opts = {'vars': {'cmdconf': {'svciden': iden}}} coro = s_common.aspin(self.storm(evnt, opts=opts)) if name == 'add': await coro else: self.schedCoro(coro)
async def _setStormSvc(self, sdef): ssvc = await s_stormsvc.StormSvcClient.anit(self, sdef) self.onfini(ssvc) self.svcsbyiden[ssvc.iden] = ssvc self.svcsbyname[ssvc.name] = ssvc return ssvc
[docs] def getStormSvcs(self): return list(self.svcsbyiden.values())
# Global stormvars APIs
[docs] async def getStormVar(self, name, default=None): return self.stormvars.get(name, defv=default)
[docs] @s_nexus.Pusher.onPushAuto('stormvar:pop') async def popStormVar(self, name, default=None): return self.stormvars.pop(name, defv=default)
[docs] @s_nexus.Pusher.onPushAuto('stormvar:set') async def setStormVar(self, name, valu): return self.stormvars.set(name, valu)
[docs] async def itemsStormVar(self): for item in self.stormvars.items(): yield item
async def _cortexHealth(self, health): health.update('cortex', 'nominal') async def _migrateTaxonomyIface(self): extforms = await (await self.hive.open(('cortex', 'model', 'forms'))).dict() for formname, basetype, typeopts, typeinfo in extforms.values(): try: ifaces = typeinfo.get('interfaces') if ifaces and 'taxonomy' in ifaces: logger.warning(f'Migrating taxonomy interface on form {formname} to meta:taxonomy.') ifaces = set(ifaces) ifaces.remove('taxonomy') ifaces.add('meta:taxonomy') typeinfo['interfaces'] = tuple(ifaces) await extforms.set(formname, (formname, basetype, typeopts, typeinfo)) except Exception as e: # pragma: no cover logger.exception(f'Taxonomy migration error for form: {formname} (skipped).') async def _loadExtModel(self): self.exttypes = self.cortexdata.getSubKeyVal('model:types:') self.extforms = self.cortexdata.getSubKeyVal('model:forms:') self.extprops = self.cortexdata.getSubKeyVal('model:props:') self.extunivs = self.cortexdata.getSubKeyVal('model:univs:') self.extedges = self.cortexdata.getSubKeyVal('model:edges:') self.exttagprops = self.cortexdata.getSubKeyVal('model:tagprops:') for typename, basetype, typeopts, typeinfo in self.exttypes.values(): try: self.model.addType(typename, basetype, typeopts, typeinfo) except Exception as e: logger.warning(f'Extended type ({typename}) error: {e}') for formname, basetype, typeopts, typeinfo in self.extforms.values(): try: self.model.addType(formname, basetype, typeopts, typeinfo) form = self.model.addForm(formname, {}, ()) except Exception as e: logger.warning(f'Extended form ({formname}) error: {e}') else: if form.type.deprecated: mesg = f'The extended property {formname} is using a deprecated type {form.type.name} which will' \ f' be removed in 3.0.0' logger.warning(mesg) for form, prop, tdef, info in self.extprops.values(): try: prop = self.model.addFormProp(form, prop, tdef, info) except Exception as e: logger.warning(f'ext prop ({form}:{prop}) error: {e}') else: if prop.type.deprecated: mesg = f'The extended property {prop.full} is using a deprecated type {prop.type.name} which will' \ f' be removed in 3.0.0' logger.warning(mesg) for prop, tdef, info in self.extunivs.values(): try: self.model.addUnivProp(prop, tdef, info) except Exception as e: logger.warning(f'ext univ ({prop}) error: {e}') for prop, tdef, info in self.exttagprops.values(): try: self.model.addTagProp(prop, tdef, info) except Exception as e: logger.warning(f'ext tag prop ({prop}) error: {e}') for edge, info in self.extedges.values(): try: self.model.addEdge(edge, info) except Exception as e: logger.warning(f'ext edge ({edge}) error: {e}')
[docs] async def getExtModel(self): ''' Get all extended model properties in the Cortex. Returns: dict: A dictionary containing forms, form properties, universal properties and tag properties. ''' ret = collections.defaultdict(list) for typename, basetype, typeopts, typeinfo in self.exttypes.values(): ret['types'].append((typename, basetype, typeopts, typeinfo)) for formname, basetype, typeopts, typeinfo in self.extforms.values(): ret['forms'].append((formname, basetype, typeopts, typeinfo)) for form, prop, tdef, info in self.extprops.values(): ret['props'].append((form, prop, tdef, info)) for prop, tdef, info in self.extunivs.values(): ret['univs'].append((prop, tdef, info)) for prop, tdef, info in self.exttagprops.values(): ret['tagprops'].append((prop, tdef, info)) for edge, info in self.extedges.values(): ret['edges'].append((edge, info)) ret['version'] = (1, 0) return copy.deepcopy(dict(ret))
[docs] async def addExtModel(self, model): ''' Add an extended model definition to a Cortex from the output of getExtModel(). Args: model (dict): An extended model dictionary. Returns: Bool: True when the model was added. Raises: s_exc.BadFormDef: If a form exists with a different definition than the provided definition. s_exc.BadPropDef: If a property, tagprop, or universal property exists with a different definition than the provided definition. s_exc.BadEdgeDef: If an edge exists with a different definition than the provided definition. ''' # Get our current model definition emodl = await self.getExtModel() amodl = collections.defaultdict(list) types = {info[0]: info for info in model.get('types', ())} forms = {info[0]: info for info in model.get('forms', ())} props = {(info[0], info[1]): info for info in model.get('props', ())} tagprops = {info[0]: info for info in model.get('tagprops', ())} univs = {info[0]: info for info in model.get('univs', ())} edges = {info[0]: info for info in model.get('edges', ())} etyps = {info[0]: info for info in emodl.get('types', ())} efrms = {info[0]: info for info in emodl.get('forms', ())} eprops = {(info[0], info[1]): info for info in emodl.get('props', ())} etagprops = {info[0]: info for info in emodl.get('tagprops', ())} eunivs = {info[0]: info for info in emodl.get('univs', ())} eedges = {info[0]: info for info in emodl.get('edges', ())} for (name, info) in types.items(): enfo = etyps.get(name) if enfo is None: amodl['types'].append(info) continue if enfo == info: continue mesg = f'Extended type definition differs from existing definition for {name}.' raise s_exc.BadTypeDef(mesg=mesg, name=name) for (name, info) in forms.items(): enfo = efrms.get(name) if enfo is None: amodl['forms'].append(info) continue if enfo == info: continue mesg = f'Extended form definition differs from existing definition for {name}.' raise s_exc.BadFormDef(mesg=mesg, name=name) for (name, info) in props.items(): enfo = eprops.get(name) if enfo is None: amodl['props'].append(info) continue if enfo == info: continue mesg = f'Extended prop definition differs from existing definition for {name}' raise s_exc.BadPropDef(mesg=mesg, name=name) for (name, info) in tagprops.items(): enfo = etagprops.get(name) if enfo is None: amodl['tagprops'].append(info) continue if enfo == info: continue mesg = f'Extended tagprop definition differs from existing definition for {name}' raise s_exc.BadPropDef(mesg=mesg, name=name) for (name, info) in univs.items(): enfo = eunivs.get(name) if enfo is None: amodl['univs'].append(info) continue if enfo == info: continue mesg = f'Extended universal property definition differs from existing definition for {name}' raise s_exc.BadPropDef(mesg=mesg, name=name) for (name, info) in edges.items(): enfo = eedges.get(name) if enfo is None: amodl['edges'].append(info) continue if enfo == info: continue (n1form, verb, n2form) = info[0] mesg = f'Extended edge definition differs from existing definition for {info[0]}' raise s_exc.BadEdgeDef(mesg=mesg, n1form=n1form, verb=verb, n2form=n2form) for typename, basetype, typeopts, typeinfo in amodl['types']: await self.addType(typename, basetype, typeopts, typeinfo) for formname, basetype, typeopts, typeinfo in amodl['forms']: await self.addForm(formname, basetype, typeopts, typeinfo) for form, prop, tdef, info in amodl['props']: await self.addFormProp(form, prop, tdef, info) for prop, tdef, info in amodl['tagprops']: await self.addTagProp(prop, tdef, info) for prop, tdef, info in amodl['univs']: await self.addUnivProp(prop, tdef, info) for edge, info in amodl['edges']: await self.addEdge(edge, info) return True
[docs] async def addUnivProp(self, name, tdef, info): if not isinstance(tdef, tuple): mesg = 'Universal property type definitions should be a tuple.' raise s_exc.BadArg(name=name, mesg=mesg) if not isinstance(info, dict): mesg = 'Universal property definitions should be a dict.' raise s_exc.BadArg(name=name, mesg=mesg) # the loading function does the actual validation... if not name.startswith('_'): mesg = 'ext univ name must start with "_"' raise s_exc.BadPropDef(name=name, mesg=mesg) base = '.' + name if base in self.model.props: raise s_exc.DupPropName(mesg=f'Cannot add duplicate universal property {base}', prop=name) await self._push('model:univ:add', name, tdef, info)
@s_nexus.Pusher.onPush('model:univ:add') async def _addUnivProp(self, name, tdef, info): base = '.' + name if base in self.model.props: return self.model.addUnivProp(name, tdef, info) self.extunivs.set(name, (name, tdef, info)) await self.fire('core:extmodel:change', prop=name, act='add', type='univ') base = '.' + name univ = self.model.univ(base) if univ: await self.feedBeholder('model:univ:add', univ.pack())
[docs] async def addForm(self, formname, basetype, typeopts, typeinfo): if not isinstance(typeopts, dict): mesg = 'Form type options should be a dict.' raise s_exc.BadArg(form=formname, mesg=mesg) if not isinstance(typeinfo, dict): mesg = 'Form type info should be a dict.' raise s_exc.BadArg(form=formname, mesg=mesg) if not formname.startswith('_'): mesg = 'Extended form must begin with "_"' raise s_exc.BadFormDef(form=formname, mesg=mesg) if self.model.form(formname) is not None: mesg = f'Form name already exists: {formname}' raise s_exc.DupFormName(mesg=mesg) if self.model.type(formname) is not None: mesg = f'Type already exists: {formname}' raise s_exc.DupTypeName.init(formname) return await self._push('model:form:add', formname, basetype, typeopts, typeinfo)
@s_nexus.Pusher.onPush('model:form:add') async def _addForm(self, formname, basetype, typeopts, typeinfo): if self.model.form(formname) is not None: return ifaces = typeinfo.get('interfaces') if ifaces and 'taxonomy' in ifaces: logger.warning(f'{formname} is using the deprecated taxonomy interface, updating to meta:taxonomy.') ifaces = set(ifaces) ifaces.remove('taxonomy') ifaces.add('meta:taxonomy') typeinfo['interfaces'] = tuple(ifaces) self.model.addType(formname, basetype, typeopts, typeinfo) self.model.addForm(formname, {}, ()) self.extforms.set(formname, (formname, basetype, typeopts, typeinfo)) await self.fire('core:extmodel:change', form=formname, act='add', type='form') form = self.model.form(formname) ftyp = self.model.type(formname) if form and ftyp: await self.feedBeholder('model:form:add', {'form': form.pack(), 'type': ftyp.pack()})
[docs] async def delForm(self, formname): if not formname.startswith('_'): mesg = 'Extended form must begin with "_"' raise s_exc.BadFormDef(form=formname, mesg=mesg) if self.model.form(formname) is None: raise s_exc.NoSuchForm.init(formname) return await self._push('model:form:del', formname)
@s_nexus.Pusher.onPush('model:form:del') async def _delForm(self, formname): if self.model.form(formname) is None: return for layr in self.layers.values(): async for item in layr.iterFormRows(formname): mesg = f'Nodes still exist with form: {formname} in layer {layr.iden}' raise s_exc.CantDelForm(mesg=mesg) self.model.delForm(formname) self.model.delType(formname) self.extforms.pop(formname, None) await self.fire('core:extmodel:change', form=formname, act='del', type='form') await self.feedBeholder('model:form:del', {'form': formname})
[docs] async def addType(self, typename, basetype, typeopts, typeinfo): if not isinstance(typeopts, dict): mesg = 'Type options should be a dict.' raise s_exc.BadArg(type=typename, mesg=mesg) if not isinstance(typeinfo, dict): mesg = 'Type info should be a dict.' raise s_exc.BadArg(type=typename, mesg=mesg) if not typename.startswith('_'): mesg = 'Extended type must begin with "_".' raise s_exc.BadTypeDef(type=typename, mesg=mesg) if self.model.type(typename) is not None: raise s_exc.DupTypeName.init(typename) if (base := self.model.type(basetype)) is None: mesg = f'Specified base type {basetype} does not exist.' raise s_exc.NoSuchType(mesg=mesg, name=basetype) base.clone(typeopts) return await self._push('model:type:add', typename, basetype, typeopts, typeinfo)
@s_nexus.Pusher.onPush('model:type:add') async def _addType(self, typename, basetype, typeopts, typeinfo): if self.model.type(typename) is not None: return ifaces = typeinfo.get('interfaces') if ifaces and 'taxonomy' in ifaces: logger.warning(f'{typename} is using the deprecated taxonomy interface, updating to meta:taxonomy.') ifaces = set(ifaces) ifaces.remove('taxonomy') ifaces.add('meta:taxonomy') typeinfo['interfaces'] = tuple(ifaces) self.model.addType(typename, basetype, typeopts, typeinfo) self.exttypes.set(typename, (typename, basetype, typeopts, typeinfo)) await self.fire('core:extmodel:change', name=typename, act='add', type='type') if (_type := self.model.type(typename)) is not None: await self.feedBeholder('model:type:add', {'type': _type.pack()})
[docs] async def delType(self, typename): if not typename.startswith('_'): mesg = 'Extended type must begin with "_".' raise s_exc.BadTypeDef(type=typename, mesg=mesg) if self.model.type(typename) is None: raise s_exc.NoSuchType.init(typename) return await self._push('model:type:del', typename)
@s_nexus.Pusher.onPush('model:type:del') async def _delType(self, typename): if self.model.type(typename) is None: return self.model.delType(typename) self.exttypes.pop(typename, None) await self.fire('core:extmodel:change', name=typename, act='del', type='type') await self.feedBeholder('model:type:del', {'type': typename})
[docs] async def addFormProp(self, form, prop, tdef, info): if not isinstance(tdef, tuple): mesg = 'Form property type definitions should be a tuple.' raise s_exc.BadArg(form=form, mesg=mesg) if not isinstance(info, dict): mesg = 'Form property definitions should be a dict.' raise s_exc.BadArg(form=form, mesg=mesg) if not prop.startswith('_') and not form.startswith('_'): mesg = 'Extended prop must begin with "_" or be added to an extended form.' raise s_exc.BadPropDef(prop=prop, mesg=mesg) _form = self.model.form(form) if _form is None: raise s_exc.NoSuchForm.init(form) if _form.prop(prop): raise s_exc.DupPropName(mesg=f'Cannot add duplicate form prop {form} {prop}', form=form, prop=prop) await self._push('model:prop:add', form, prop, tdef, info)
@s_nexus.Pusher.onPush('model:prop:add') async def _addFormProp(self, form, prop, tdef, info): if (_form := self.model.form(form)) is not None and _form.prop(prop) is not None: return _prop = self.model.addFormProp(form, prop, tdef, info) if _prop.type.deprecated: mesg = f'The extended property {_prop.full} is using a deprecated type {_prop.type.name} which will' \ f' be removed in 3.0.0' logger.warning(mesg) full = f'{form}:{prop}' self.extprops.set(full, (form, prop, tdef, info)) await self.fire('core:extmodel:change', form=form, prop=prop, act='add', type='formprop') prop = self.model.prop(full) if prop: await self.feedBeholder('model:prop:add', {'form': form, 'prop': prop.pack()})
[docs] async def delFormProp(self, form, prop): self.reqExtProp(form, prop) return await self._push('model:prop:del', form, prop)
async def _delAllFormProp(self, formname, propname, meta): ''' Delete all instances of a property from all layers. NOTE: This does not fire triggers. ''' self.reqExtProp(formname, propname) fullname = f'{formname}:{propname}' prop = self.model.prop(fullname) await self.setPropLocked(fullname, True) for layr in list(self.layers.values()): genr = layr.iterPropRows(formname, propname) async for rows in s_coro.chunks(genr): nodeedits = [] for buid, valu in rows: nodeedits.append((buid, prop.form.name, ( (s_layer.EDIT_PROP_DEL, (prop.name, None, prop.type.stortype), ()), ))) await layr.saveNodeEdits(nodeedits, meta) await asyncio.sleep(0) async def _delAllUnivProp(self, propname, meta): ''' Delete all instances of a universal property from all layers. NOTE: This does not fire triggers. ''' self.reqExtUniv(propname) full = f'.{propname}' prop = self.model.univ(full) await self.setUnivLocked(full, True) for layr in list(self.layers.values()): genr = layr.iterUnivRows(full) async for rows in s_coro.chunks(genr): nodeedits = [] for buid, valu in rows: sode = await layr.getStorNode(buid) nodeedits.append((buid, sode.get('form'), ( (s_layer.EDIT_PROP_DEL, (prop.name, None, prop.type.stortype), ()), ))) await layr.saveNodeEdits(nodeedits, meta) await asyncio.sleep(0) async def _delAllTagProp(self, propname, meta): ''' Delete all instances of a tag property from all layers. NOTE: This does not fire triggers. ''' self.reqExtTagProp(propname) prop = self.model.getTagProp(propname) await self.setTagPropLocked(propname, True) for layr in list(self.layers.values()): for form, tag, tagprop in layr.getTagProps(): if tagprop != propname: # pragma: no cover await asyncio.sleep(0) continue genr = layr.iterTagPropRows(tag, tagprop, form) async for rows in s_coro.chunks(genr): nodeedits = [] for buid, valu in rows: nodeedits.append((buid, form, ( (s_layer.EDIT_TAGPROP_DEL, (tag, prop.name, None, prop.type.stortype), ()), ))) await layr.saveNodeEdits(nodeedits, meta) await asyncio.sleep(0)
[docs] def reqExtProp(self, form, prop): full = f'{form}:{prop}' pdef = self.extprops.get(full) if pdef is None: mesg = f'No ext prop named {full}' raise s_exc.NoSuchProp(form=form, prop=prop, mesg=mesg) return pdef
[docs] def reqExtUniv(self, prop): udef = self.extunivs.get(prop) if udef is None: mesg = f'No ext univ named {prop}' raise s_exc.NoSuchUniv(name=prop, mesg=mesg) return udef
[docs] def reqExtTagProp(self, name): pdef = self.exttagprops.get(name) if pdef is None: mesg = f'No tag prop named {name}' raise s_exc.NoSuchTagProp(mesg=mesg, name=name) return pdef
@s_nexus.Pusher.onPush('model:prop:del') async def _delFormProp(self, form, prop): ''' Remove an extended property from the cortex. ''' full = f'{form}:{prop}' pdef = self.extprops.get(full) if pdef is None: return for layr in self.layers.values(): async for item in layr.iterPropRows(form, prop): mesg = f'Nodes still exist with prop: {form}:{prop} in layer {layr.iden}' raise s_exc.CantDelProp(mesg=mesg) self.model.delFormProp(form, prop) self.extprops.pop(full, None) self.modellocks.pop(f'prop/{full}', None) await self.fire('core:extmodel:change', form=form, prop=prop, act='del', type='formprop') await self.feedBeholder('model:prop:del', {'form': form, 'prop': prop})
[docs] async def delUnivProp(self, prop): self.reqExtUniv(prop) return await self._push('model:univ:del', prop)
@s_nexus.Pusher.onPush('model:univ:del') async def _delUnivProp(self, prop): ''' Remove an extended universal property from the cortex. ''' udef = self.extunivs.get(prop) if udef is None: return univname = '.' + prop for layr in self.layers.values(): async for item in layr.iterUnivRows(univname): mesg = f'Nodes still exist with universal prop: {prop} in layer {layr.iden}' raise s_exc.CantDelUniv(mesg=mesg) self.model.delUnivProp(prop) self.extunivs.pop(prop, None) self.modellocks.pop(f'univ/{prop}', None) await self.fire('core:extmodel:change', name=prop, act='del', type='univ') await self.feedBeholder('model:univ:del', {'prop': univname})
[docs] async def addTagProp(self, name, tdef, info): if not isinstance(tdef, tuple): mesg = 'Tag property type definitions should be a tuple.' raise s_exc.BadArg(name=name, mesg=mesg) if not isinstance(info, dict): mesg = 'Tag property definitions should be a dict.' raise s_exc.BadArg(name=name, mesg=mesg) if self.exttagprops.get(name) is not None: raise s_exc.DupPropName(name=name) return await self._push('model:tagprop:add', name, tdef, info)
@s_nexus.Pusher.onPush('model:tagprop:add') async def _addTagProp(self, name, tdef, info): if self.exttagprops.get(name) is not None: return self.model.addTagProp(name, tdef, info) self.exttagprops.set(name, (name, tdef, info)) await self.fire('core:tagprop:change', name=name, act='add') tagp = self.model.tagprop(name) if tagp: await self.feedBeholder('model:tagprop:add', tagp.pack())
[docs] async def delTagProp(self, name): self.reqExtTagProp(name) return await self._push('model:tagprop:del', name)
@s_nexus.Pusher.onPush('model:tagprop:del') async def _delTagProp(self, name): pdef = self.exttagprops.get(name) if pdef is None: return for layr in self.layers.values(): if await layr.hasTagProp(name): mesg = f'Nodes still exist with tagprop: {name} in layer {layr.iden}' raise s_exc.CantDelProp(mesg=mesg) self.model.delTagProp(name) self.exttagprops.pop(name, None) self.modellocks.pop(f'tagprop/{name}', None) await self.fire('core:tagprop:change', name=name, act='del') await self.feedBeholder('model:tagprop:del', {'tagprop': name})
[docs] async def addEdge(self, edge, edgeinfo): if not isinstance(edgeinfo, dict): mesg = 'Edge info should be a dict.' raise s_exc.BadArg(mesg=mesg, edgeinfo=edgeinfo) (n1form, verb, n2form) = edge if not verb.startswith('_'): mesg = f'Extended edge verb must begin with "_"; got {verb}' raise s_exc.BadEdgeDef(mesg=mesg, n1form=n1form, verb=verb, n2form=n2form) if n1form is not None: self.model._reqFormName(n1form) if n2form is not None: self.model._reqFormName(n2form) if self.model.edge(edge) is not None: raise s_exc.DupEdgeType.init(edge) return await self._push('model:edge:add', edge, edgeinfo)
@s_nexus.Pusher.onPush('model:edge:add') async def _addEdge(self, edge, edgeinfo): if self.model.edge(edge) is not None: return self.model.addEdge(edge, edgeinfo) self.extedges.set(s_common.guid(edge), (edge, edgeinfo)) await self.fire('core:extmodel:change', edge=edge, act='add', type='edge') await self.feedBeholder('model:edge:add', {'edge': edge, 'info': edgeinfo})
[docs] async def delEdge(self, edge): if self.extedges.get(s_common.guid(edge)) is None: raise s_exc.NoSuchEdge.init(edge) return await self._push('model:edge:del', edge)
@s_nexus.Pusher.onPush('model:edge:del') async def _delEdge(self, edge): edgeguid = s_common.guid(edge) if self.extedges.get(edgeguid) is None: return self.model.delEdge(edge) self.extedges.pop(edgeguid, None) await self.fire('core:extmodel:change', edge=edge, act='del', type='edge') await self.feedBeholder('model:edge:del', {'edge': edge})
[docs] async def addNodeTag(self, user, iden, tag, valu=(None, None)): ''' Add a tag to a node specified by iden. Args: iden (str): A hex encoded node BUID. tag (str): A tag string. valu (tuple): A time interval tuple or (None, None). ''' buid = s_common.uhex(iden) async with await self.snap(user=user) as snap: node = await snap.getNodeByBuid(buid) if node is None: raise s_exc.NoSuchIden(iden=iden) await node.addTag(tag, valu=valu) return node.pack()
[docs] async def addNode(self, user, form, valu, props=None): async with await self.snap(user=user) as snap: node = await snap.addNode(form, valu, props=props) return node.pack()
[docs] async def delNodeTag(self, user, iden, tag): ''' Delete a tag from the node specified by iden. Args: iden (str): A hex encoded node BUID. tag (str): A tag string. ''' buid = s_common.uhex(iden) async with await self.snap(user=user) as snap: node = await snap.getNodeByBuid(buid) if node is None: raise s_exc.NoSuchIden(iden=iden) await node.delTag(tag) return node.pack()
async def _onCoreFini(self): ''' Generic fini handler for cortex components which may change or vary at runtime. ''' if self.axon: await self.axon.fini()
[docs] async def syncLayerNodeEdits(self, iden, offs, wait=True): ''' Yield (offs, mesg) tuples for nodeedits in a layer. ''' layr = self.getLayer(iden) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {iden}', iden=iden) async for item in layr.syncNodeEdits(offs, wait=wait): yield item
[docs] async def syncLayersEvents(self, offsdict=None, wait=True): ''' Yield (offs, layriden, STYP, item, meta) tuples for nodeedits for *all* layers, interspersed with add/del layer messages. STYP is one of the following constants: SYNC_NODEEDITS: item is a nodeedits (buid, form, edits) SYNC_LAYR_ADD: A layer was added (item and meta are empty) SYNC_LAYR_DEL: A layer was deleted (item and meta are empty) Args: offsdict(Optional(Dict[str,int])): starting nexus/editlog offset by layer iden. Defaults to 0 for unspecified layers or if offsdict is None. wait(bool): whether to pend and stream value until this layer is fini'd ''' async def layrgenr(layr, startoff, endoff=None, newlayer=False): if newlayer: yield layr.addoffs, layr.iden, SYNC_LAYR_ADD, (), {} wait = endoff is None if not layr.isfini: async for ioff, item, meta in layr.syncNodeEdits2(startoff, wait=wait): if endoff is not None and ioff >= endoff: # pragma: no cover break yield ioff, layr.iden, SYNC_NODEEDITS, item, meta await asyncio.sleep(0) if layr.isdeleted: yield layr.deloffs, layr.iden, SYNC_LAYR_DEL, (), {} # End of layrgenr async for item in self._syncNodeEdits(offsdict, layrgenr, wait=wait): yield item
[docs] async def syncIndexEvents(self, matchdef, offsdict=None, wait=True): ''' Yield (offs, layriden, <STYPE>, <item>) tuples from the nodeedit logs of all layers starting from the given nexus/layer offset (they are synchronized). Only edits that match the filter in matchdef will be yielded, plus EDIT_PROGRESS (see layer.syncIndexEvents) messages. The format of the 4th element of the tuple depends on STYPE. STYPE is one of the following constants: SYNC_LAYR_ADD: item is an empty tuple () SYNC_LAYR_DEL: item is an empty tuple () SYNC_NODEEDIT: item is (buid, form, ETYPE, VALS, META)) or (None, None, s_layer.EDIT_PROGRESS, (), ()) For edits in the past, events are yielded in offset order across all layers. For current data (wait=True), events across different layers may be emitted slightly out of offset order. Note: Will not yield any values from layers created with logedits disabled Args: matchdef(Dict[str, Sequence[str]]): a dict describing which events are yielded. See layer.syncIndexEvents for matchdef specification. offsdict(Optional(Dict[str,int])): starting nexus/editlog offset by layer iden. Defaults to 0 for unspecified layers or if offsdict is None. wait(bool): whether to pend and stream value until this layer is fini'd ''' async def layrgenr(layr, startoff, endoff=None, newlayer=False): ''' Yields matching results from a single layer ''' if newlayer: yield layr.addoffs, layr.iden, SYNC_LAYR_ADD, () wait = endoff is None ioff = startoff if not layr.isfini: async for ioff, item in layr.syncIndexEvents(startoff, matchdef, wait=wait): if endoff is not None and ioff >= endoff: # pragma: no cover break yield ioff, layr.iden, SYNC_NODEEDIT, item if layr.isdeleted: yield layr.deloffs, layr.iden, SYNC_LAYR_DEL, () # End of layrgenr async for item in self._syncNodeEdits(offsdict, layrgenr, wait=wait): yield item
async def _syncNodeEdits(self, offsdict, genrfunc, wait=True): ''' Common guts between syncIndexEvents and syncLayersEvents First, it streams from the layers up to the current offset, sorted by offset. Then it streams from all the layers simultaneously. Args: offsdict(Dict[str, int]): starting nexus/editlog offset per layer. Defaults to 0 if layer not present genrfunc(Callable): an async generator function that yields tuples that start with an offset. The input parameters are: layr(Layer): a Layer object startoff(int); the starting offset endoff(Optional[int]): the ending offset newlayer(bool): whether to emit a new layer item first wait(bool): when the end of the log is hit, whether to continue to wait for new entries and yield them ''' catchingup = True # whether we've caught up to topoffs layrsadded = {} # layriden -> True. Captures all the layers added while catching up todo = set() # outstanding futures of active live streaming from layers layrgenrs = {} # layriden -> genr. maps active layers to that layer's async generator # The offset to start from once the catch-up phase is complete topoffs = max(layr.nodeeditlog.index() for layr in self.layers.values()) if offsdict is None: offsdict = {} newtodoevent = asyncio.Event() async with await s_base.Base.anit() as base: def addlayr(layr, newlayer=False, startoffs=topoffs): ''' A new layer joins the live stream ''' genr = genrfunc(layr, startoffs, newlayer=newlayer) layrgenrs[layr.iden] = genr task = base.schedCoro(genr.__anext__()) task.iden = layr.iden todo.add(task) newtodoevent.set() def onaddlayr(mesg): etyp, event = mesg layriden = event['iden'] layr = self.getLayer(layriden) if catchingup: layrsadded[layr] = True return addlayr(layr, newlayer=True) self.on('core:layr:add', onaddlayr, base=base) # First, catch up to what was the current offset when we started, guaranteeing order logger.debug(f'_syncNodeEdits() running catch-up sync to offs={topoffs}') genrs = [genrfunc(layr, offsdict.get(layr.iden, 0), endoff=topoffs) for layr in self.layers.values()] async for item in s_common.merggenr(genrs, lambda x, y: x[0] < y[0]): yield item catchingup = False if not wait: return # After we've caught up, read on genrs from all the layers simultaneously logger.debug('_syncNodeEdits() entering into live sync') lastoffs = {} todo.clear() for layr in self.layers.values(): if layr not in layrsadded: addlayr(layr) for layr in layrsadded: addlayr(layr, newlayer=True) # Also, wake up if we get fini'd finitask = base.schedCoro(self.waitfini()) todo.add(finitask) newtodotask = base.schedCoro(newtodoevent.wait()) todo.add(newtodotask) while not self.isfini: newtodoevent.clear() done, _ = await asyncio.wait(todo, return_when=asyncio.FIRST_COMPLETED) for donetask in done: try: todo.remove(donetask) if donetask is finitask: # pragma: no cover # We were fini'd return if donetask is newtodotask: newtodotask = base.schedCoro(newtodoevent.wait()) todo.add(newtodotask) continue layriden = donetask.iden result = donetask.result() yield result lastoffs[layriden] = result[0] # Re-add a task to wait on the next iteration of the generator genr = layrgenrs[layriden] task = base.schedCoro(genr.__anext__()) task.iden = layriden todo.add(task) except StopAsyncIteration: # Help out the garbage collector del layrgenrs[layriden] layr = self.getLayer(iden=layriden) if layr is None or not layr.logedits: logger.debug(f'_syncNodeEdits() removed {layriden=} from sync') continue startoffs = lastoffs[layriden] + 1 if layriden in lastoffs else topoffs logger.debug(f'_syncNodeEdits() restarting {layriden=} live sync from offs={startoffs}') addlayr(layr, startoffs=startoffs) await self.waitfini(1) async def _initCoreInfo(self): self.stormvars = self.cortexdata.getSubKeyVal('storm:vars:') if self.inaugural: self.stormvars.set(s_stormlib_cell.runtime_fixes_key, s_stormlib_cell.getMaxHotFixes()) async def _initDeprLocks(self): self.deprlocks = self.cortexdata.getSubKeyVal('model:deprlocks:') self.modellocks = self.cortexdata.getSubKeyVal('model:locks:') # TODO: 3.0.0 conversion will truncate this hive key if self.inaugural: locks = ( # 2.87.0 - lock out incorrect crypto model ('crypto:currency:transaction:inputs', True), ('crypto:currency:transaction:outputs', True), ) for k, v in locks: await self._hndlsetDeprLock(k, v) for name, locked in self.deprlocks.items(): form = self.model.form(name) if form is not None: form.locked = locked prop = self.model.prop(name) if prop is not None: prop.locked = locked _type = self.model.type(name) if _type is not None: _type.locked = locked for name, locked in self.modellocks.items(): prop = None elemtype, elemname = name.split('/', 1) if elemtype == 'prop': prop = self.model.prop(elemname) elif elemtype == 'univ': prop = self.model.univ(elemname) if prop is not None: for univ in self.model.getAllUnivs(elemname): univ.locked = locked elif elemtype == 'tagprop': prop = self.model.getTagProp(elemname) if prop is not None: prop.locked = locked async def _initJsonStor(self): self.jsonurl = self.conf.get('jsonstor') if self.jsonurl is not None: async def onlink(proxy: s_telepath.Proxy): logger.debug(f'Connected to remote jsonstor {s_urlhelp.sanitizeUrl(self.jsonurl)}') self.jsonstor = await s_telepath.Client.anit(self.jsonurl, onlink=onlink) else: path = os.path.join(self.dirn, 'jsonstor') jsoniden = s_common.guid((self.iden, 'jsonstor')) idenpath = os.path.join(path, 'cell.guid') # check that the jsonstor cell GUID is what it should be. If not, update it. # ( bugfix for first release where cell was allowed to generate it's own iden ) if os.path.isfile(idenpath): with open(idenpath, 'r') as fd: existiden = fd.read() if jsoniden != existiden: with open(idenpath, 'w') as fd: fd.write(jsoniden) # Disable sysctl checks for embedded jsonstor server conf = {'cell:guid': jsoniden, 'health:sysctl:checks': False} self.jsonstor = await s_jsonstor.JsonStorCell.anit(path, conf=conf, parent=self) self.onfini(self.jsonstor)
[docs] async def getJsonObj(self, path): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.getPathObj(path)
[docs] async def hasJsonObj(self, path): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.hasPathObj(path)
[docs] async def getJsonObjs(self, path): if self.jsonurl is not None: await self.jsonstor.waitready() async for item in self.jsonstor.getPathObjs(path): yield item
[docs] async def getJsonObjProp(self, path, prop): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.getPathObjProp(path, prop)
[docs] async def delJsonObj(self, path): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.delPathObj(path)
[docs] async def delJsonObjProp(self, path, prop): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.delPathObjProp(path, prop)
[docs] async def setJsonObj(self, path, item): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.setPathObj(path, item)
[docs] async def setJsonObjProp(self, path, prop, item): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.setPathObjProp(path, prop, item)
[docs] async def getUserNotif(self, indx): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.getUserNotif(indx)
[docs] async def delUserNotif(self, indx): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.delUserNotif(indx)
[docs] async def addUserNotif(self, useriden, mesgtype, mesgdata=None): if self.jsonurl is not None: await self.jsonstor.waitready() return await self.jsonstor.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)
[docs] async def iterUserNotifs(self, useriden, size=None): if self.jsonurl is not None: await self.jsonstor.waitready() async for item in self.jsonstor.iterUserNotifs(useriden, size=size): yield item
[docs] async def watchAllUserNotifs(self, offs=None): if self.jsonurl is not None: await self.jsonstor.waitready() async for item in self.jsonstor.watchAllUserNotifs(offs=offs): yield item
async def _initCoreAxon(self): turl = self.conf.get('axon') if turl is None: path = os.path.join(self.dirn, 'axon') # Disable sysctl checks for embedded axon server conf = {'health:sysctl:checks': False} proxyurl = self.conf.get('http:proxy') if proxyurl is not None: conf['http:proxy'] = proxyurl cadir = self.conf.get('tls:ca:dir') if cadir is not None: conf['tls:ca:dir'] = cadir self.axon = await s_axon.Axon.anit(path, conf=conf, parent=self) self.axoninfo = await self.axon.getCellInfo() self.axon.onfini(self.axready.clear) self.dynitems['axon'] = self.axon self.axready.set() return async def onlink(proxy: s_telepath.Proxy): logger.debug(f'Connected to remote axon {s_urlhelp.sanitizeUrl(turl)}') async def fini(): self.axready.clear() self.axoninfo = await proxy.getCellInfo() proxy.onfini(fini) self.axready.set() self.axon = await s_telepath.Client.anit(turl, onlink=onlink) self.dynitems['axon'] = self.axon self.onfini(self.axon) async def _initStormCmds(self): ''' Registration for built-in Storm commands. ''' self.addStormCmd(s_storm.MaxCmd) self.addStormCmd(s_storm.MinCmd) self.addStormCmd(s_storm.TeeCmd) self.addStormCmd(s_storm.DiffCmd) self.addStormCmd(s_storm.OnceCmd) self.addStormCmd(s_storm.TreeCmd) self.addStormCmd(s_storm.HelpCmd) self.addStormCmd(s_storm.IdenCmd) self.addStormCmd(s_storm.SpinCmd) self.addStormCmd(s_storm.UniqCmd) self.addStormCmd(s_storm.BatchCmd) self.addStormCmd(s_storm.CountCmd) self.addStormCmd(s_storm.GraphCmd) self.addStormCmd(s_storm.LimitCmd) self.addStormCmd(s_storm.MergeCmd) self.addStormCmd(s_storm.RunAsCmd) self.addStormCmd(s_storm.SleepCmd) self.addStormCmd(s_storm.CopyToCmd) self.addStormCmd(s_storm.DivertCmd) self.addStormCmd(s_storm.ScrapeCmd) self.addStormCmd(s_storm.DelNodeCmd) self.addStormCmd(s_storm.LiftByVerb) self.addStormCmd(s_storm.MoveTagCmd) self.addStormCmd(s_storm.ReIndexCmd) self.addStormCmd(s_storm.EdgesDelCmd) self.addStormCmd(s_storm.ParallelCmd) self.addStormCmd(s_storm.TagPruneCmd) self.addStormCmd(s_storm.ViewExecCmd) self.addStormCmd(s_storm.IntersectCmd) self.addStormCmd(s_storm.MoveNodesCmd) self.addStormCmd(s_storm.BackgroundCmd) self.addStormCmd(s_stormlib_macro.MacroExecCmd) self.addStormCmd(s_stormlib_storm.StormExecCmd) self.addStormCmd(s_stormlib_stats.StatsCountByCmd) self.addStormCmd(s_stormlib_cortex.StormPoolDelCmd) self.addStormCmd(s_stormlib_cortex.StormPoolGetCmd) self.addStormCmd(s_stormlib_cortex.StormPoolSetCmd) for cdef in s_stormsvc.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) for cdef in s_storm.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) for cdef in s_stormlib_aha.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) for cdef in s_stormlib_gen.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) for cdef in s_stormlib_auth.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) for cdef in s_stormlib_macro.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) for cdef in s_stormlib_model.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) for cdef in s_stormlib_cortex.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) for cdef in s_stormlib_vault.stormcmds: await self._trySetStormCmd(cdef.get('name'), cdef) async def _initPureStormCmds(self): oldcmds = [] for name, cdef in self.cmddefs.items(): cmdiden = cdef.get('cmdconf', {}).get('svciden') if cmdiden and self.svcdefs.get(cmdiden) is None: oldcmds.append(name) else: await self._trySetStormCmd(name, cdef) for name in oldcmds: logger.warning(f'Removing old command: [{name}]') self.cmddefs.pop(name) for pkgdef in self.pkgdefs.values(): await self._tryLoadStormPkg(pkgdef) async def _trySetStormCmd(self, name, cdef): try: await self._setStormCmd(cdef) except (asyncio.CancelledError, Exception): logger.exception(f'Storm command load failed: {name}') def _initStormLibs(self): ''' Registration for built-in Storm Libraries ''' for path, ctor in s_stormtypes.registry.iterLibs(): # Ensure each ctor's permdefs are valid for pdef in ctor._storm_lib_perms: s_storm.reqValidPermDef(pdef) # Skip libbase which is registered as a default ctor in the storm Runtime if path: self.addStormLib(path, ctor) def _initFeedFuncs(self): ''' Registration for built-in Cortex feed functions. ''' self.setFeedFunc('syn.nodes', self._addSynNodes) def _initCortexHttpApi(self): ''' Registration for built-in Cortex httpapi endpoints ''' self.addHttpApi('/api/v1/feed', s_httpapi.FeedV1, {'cell': self}) self.addHttpApi('/api/v1/storm', s_httpapi.StormV1, {'cell': self}) self.addHttpApi('/api/v1/storm/call', s_httpapi.StormCallV1, {'cell': self}) self.addHttpApi('/api/v1/storm/nodes', s_httpapi.StormNodesV1, {'cell': self}) self.addHttpApi('/api/v1/storm/export', s_httpapi.StormExportV1, {'cell': self}) self.addHttpApi('/api/v1/reqvalidstorm', s_httpapi.ReqValidStormV1, {'cell': self}) self.addHttpApi('/api/v1/storm/vars/set', s_httpapi.StormVarsSetV1, {'cell': self}) self.addHttpApi('/api/v1/storm/vars/get', s_httpapi.StormVarsGetV1, {'cell': self}) self.addHttpApi('/api/v1/storm/vars/pop', s_httpapi.StormVarsPopV1, {'cell': self}) self.addHttpApi('/api/v1/model', s_httpapi.ModelV1, {'cell': self}) self.addHttpApi('/api/v1/model/norm', s_httpapi.ModelNormV1, {'cell': self}) self.addHttpApi('/api/v1/core/info', s_httpapi.CoreInfoV1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/del', CortexAxonHttpDelV1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/put', CortexAxonHttpUploadV1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/has/sha256/([0-9a-fA-F]{64}$)', CortexAxonHttpHasV1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/by/sha256/([0-9a-fA-F]{64}$)', CortexAxonHttpBySha256V1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/by/sha256/(.*)', CortexAxonHttpBySha256InvalidV1, {'cell': self}) self.addHttpApi('/api/ext/(.*)', s_httpapi.ExtApiHandler, {'cell': self}) def _initCortexExtHttpApi(self): self._exthttpapis.clear() self._exthttpapicache.clear() byts = self.slab.get(self._exthttpapiorder, self.httpextapidb) if byts is None: return order = s_msgpack.un(byts) for iden in order: byts = self.slab.get(s_common.uhex(iden), self.httpextapidb) if byts is None: # pragma: no cover logger.error(f'Missing HTTP API definition for iden={iden}') continue adef = s_msgpack.un(byts) self._exthttpapis[adef.get('iden')] = adef
[docs] async def addHttpExtApi(self, adef): path = adef.get('path') try: _ = regex.compile(path) except Exception as e: mesg = f'Invalid path for Extended HTTP API - cannot compile regular expression for [{path}] : {e}' raise s_exc.BadArg(mesg=mesg) from None if adef.get('iden') is None: adef['iden'] = s_common.guid() iden = adef['iden'] if self._exthttpapis.get(iden) is not None: raise s_exc.DupIden(mesg=f'Duplicate iden specified for Extended HTTP API: {iden}', iden=iden) adef['created'] = s_common.now() adef['updated'] = adef['created'] adef = s_schemas.reqValidHttpExtAPIConf(adef) return await self._push('http:api:add', adef)
@s_nexus.Pusher.onPush('http:api:add') async def _addHttpExtApi(self, adef): iden = adef.get('iden') self.slab.put(s_common.uhex(iden), s_msgpack.en(adef), db=self.httpextapidb) order = self.slab.get(self._exthttpapiorder, db=self.httpextapidb) if order is None: self.slab.put(self._exthttpapiorder, s_msgpack.en([iden]), db=self.httpextapidb) else: order = s_msgpack.un(order) # type: tuple if iden not in order: # Replay safety order = order + (iden, ) # New handlers go to the end of the list of handlers self.slab.put(self._exthttpapiorder, s_msgpack.en(order), db=self.httpextapidb) # Re-initialize the HTTP API list from the index order self._initCortexExtHttpApi() return adef
[docs] @s_nexus.Pusher.onPushAuto('http:api:del') async def delHttpExtApi(self, iden): if s_common.isguid(iden) is False: raise s_exc.BadArg(mesg=f'Must provide an iden. Got {iden}') self.slab.pop(s_common.uhex(iden), db=self.httpextapidb) byts = self.slab.get(self._exthttpapiorder, self.httpextapidb) order = list(s_msgpack.un(byts)) if iden in order: order.remove(iden) self.slab.put(self._exthttpapiorder, s_msgpack.en(order), db=self.httpextapidb) self._initCortexExtHttpApi() return
[docs] @s_nexus.Pusher.onPushAuto('http:api:mod') async def modHttpExtApi(self, iden, name, valu): # Created, Creator, Updated are not mutable if name in ('name', 'desc', 'runas', 'methods', 'authenticated', 'pool', 'perms', 'readonly', 'vars'): # Schema takes care of these values pass elif name == 'owner': _obj = await self.getUserDef(valu, packroles=False) if _obj is None: raise s_exc.NoSuchUser(mesg=f'Cannot set owner={valu} on extended httpapi, it does not exist.') elif name == 'view': _obj = self.getView(valu) if _obj is None: raise s_exc.NoSuchView(mesg=f'Cannot set view={valu} on extended httpapi, it does not exist.') elif name == 'path': try: _ = regex.compile(valu) except Exception as e: mesg = f'Invalid path for Extended HTTP API - cannot compile regular expression for [{valu}] : {e}' raise s_exc.BadArg(mesg=mesg) from None else: raise s_exc.BadArg(mesg=f'Cannot set {name=} on extended httpapi') byts = self.slab.get(s_common.uhex(iden), db=self.httpextapidb) if byts is None: raise s_exc.NoSuchIden(mesg=f'No http api for {iden=}', iden=iden) adef = s_msgpack.un(byts) adef[name] = valu adef['updated'] = s_common.now() adef = s_schemas.reqValidHttpExtAPIConf(adef) self.slab.put(s_common.uhex(iden), s_msgpack.en(adef), db=self.httpextapidb) self._initCortexExtHttpApi() return adef
[docs] @s_nexus.Pusher.onPushAuto('http:api:indx') async def setHttpApiIndx(self, iden, indx): if indx < 0: raise s_exc.BadArg(mesg=f'indx must be greater than or equal to 0; got {indx}') byts = self.slab.get(self._exthttpapiorder, db=self.httpextapidb) if byts is None: raise s_exc.SynErr(mesg='No Extended HTTP handlers registered. Cannot set order.') order = list(s_msgpack.un(byts)) if iden not in order: raise s_exc.NoSuchIden(mesg=f'Extended HTTP API is not set: {iden}') if order.index(iden) == indx: return indx order.remove(iden) # indx values > length of the list end up at the end of the list. order.insert(indx, iden) self.slab.put(self._exthttpapiorder, s_msgpack.en(order), db=self.httpextapidb) self._initCortexExtHttpApi() return order.index(iden)
[docs] async def getHttpExtApis(self): return copy.deepcopy(list(self._exthttpapis.values()))
[docs] async def getHttpExtApi(self, iden): adef = self._exthttpapis.get(iden) if adef is None: raise s_exc.NoSuchIden(mesg=f'No extended http api for {iden=}', iden=iden) return copy.deepcopy(adef)
[docs] async def getHttpExtApiByPath(self, path): iden, args = self._exthttpapicache.get(path) adef = copy.deepcopy(self._exthttpapis.get(iden)) return adef, args
def _getHttpExtApiByPath(self, key): # Cache callback. # Returns (iden, args) or (None, ()) for caching. for iden, adef in self._exthttpapis.items(): match = regex.fullmatch(adef.get('path'), key) if match is None: continue return iden, match.groups() return None, ()
[docs] async def getCellApi(self, link, user, path): if not path: return await self.cellapi.anit(self, link, user) # allow an admin to directly open the cortex hive # (perhaps this should be a Cell() level pattern) if path[0] == 'hive' and user.isAdmin(): return await self.hiveapi.anit(self.hive, user) if path[0] == 'layer': if len(path) == 1: # get the top layer for the default view layr = self.getLayer() return await self.layerapi.anit(self, link, user, layr) if len(path) == 2: layr = self.getLayer(path[1]) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {path[1]}', iden=path[1]) return await self.layerapi.anit(self, link, user, layr) if path[0] == 'view': view = None if len(path) == 1: view = self.getView(user=user) elif len(path) == 2: view = self.getView(path[1], user=user) if view is not None: return await self.viewapi.anit(self, link, user, view) raise s_exc.NoSuchPath(mesg=f'Invalid telepath path={path}', path=path)
[docs] async def getModelDict(self): return self.model.getModelDict()
[docs] async def getModelDefs(self): return self.model.getModelDefs()
[docs] async def getFormCounts(self): ''' Return total form counts for all existing layers ''' counts = collections.defaultdict(int) for layr in self.layers.values(): layrcounts = await layr.getFormCounts() for name, valu in layrcounts.items(): counts[name] += valu return dict(counts)
[docs] def addRuntLift(self, prop, func): ''' Register a runt lift helper for a given prop. Args: prop (str): Full property name for the prop to register the helper for. func: Returns: None: None. ''' self._runtLiftFuncs[prop] = func
[docs] async def runRuntLift(self, full, valu=None, cmpr=None, view=None): ''' Execute a runt lift function. Args: full (str): Property to lift by. valu: cmpr: Returns: bytes, list: Yields bytes, list tuples where the list contains a series of key/value pairs which are used to construct a Node object. ''' func = self._runtLiftFuncs.get(full) if func is not None: async for pode in func(full, valu, cmpr, view): yield pode
[docs] def addRuntPropSet(self, full, func): ''' Register a prop set helper for a runt form ''' self._runtPropSetFuncs[full] = func
[docs] async def runRuntPropSet(self, node, prop, valu): func = self._runtPropSetFuncs.get(prop.full) if func is None: raise s_exc.IsRuntForm(mesg='No prop:set func set for runt property.', prop=prop.full, valu=valu, ndef=node.ndef) ret = await s_coro.ornot(func, node, prop, valu) return ret
[docs] def addRuntPropDel(self, full, func): ''' Register a prop set helper for a runt form ''' self._runtPropDelFuncs[full] = func
[docs] async def runRuntPropDel(self, node, prop): func = self._runtPropDelFuncs.get(prop.full) if func is None: raise s_exc.IsRuntForm(mesg='No prop:del func set for runt property.', prop=prop.full, ndef=node.ndef) ret = await s_coro.ornot(func, node, prop) return ret
async def _checkLayerModels(self): async with self.enterMigrationMode(): mrev = s_modelrev.ModelRev(self) await mrev.revCoreLayers() async def _loadView(self, vdef): view = await self.viewctor(self, vdef) self.views[view.iden] = view self.dynitems[view.iden] = view async def fini(): self.views.pop(view.iden, None) self.dynitems.pop(view.iden, None) view.onfini(fini) return view def _calcViewsByLayer(self): # keep track of views by layer self.viewsbylayer.clear() for view in self.views.values(): for layr in view.layers: self.viewsbylayer[layr.iden].append(view) async def _initCoreViews(self): defiden = self.cellinfo.get('defaultview') self.viewdefs = self.cortexdata.getSubKeyVal('view:info:') for iden, vdef in self.viewdefs.items(): view = await self._loadView(vdef) if iden == defiden: self.view = view for view in self.views.values(): view.init2() # if we have no views, we are initializing. Add a default main view and layer. if not self.views: assert self.inaugural, 'Cortex initialization failed: there are no views.' ldef = {'name': 'default'} ldef = await self.addLayer(ldef=ldef, nexs=False) layriden = ldef.get('iden') role = await self.auth.getRoleByName('all') await role.addRule((True, ('layer', 'read')), gateiden=layriden, nexs=False) vdef = { 'name': 'default', 'layers': (layriden,), 'worldreadable': True, } vdef = await self.addView(vdef, nexs=False) iden = vdef.get('iden') self.cellinfo.set('defaultview', iden) self.view = self.getView(iden) self._calcViewsByLayer()
[docs] async def addView(self, vdef, nexs=True): vdef['iden'] = s_common.guid() vdef['created'] = s_common.now() vdef.setdefault('parent', None) vdef.setdefault('worldreadable', False) vdef.setdefault('creator', self.auth.rootuser.iden) s_schemas.reqValidView(vdef) if nexs: return await self._push('view:add', vdef) else: return await self._addView(vdef)
@s_nexus.Pusher.onPush('view:add') async def _addView(self, vdef): s_schemas.reqValidView(vdef) iden = vdef['iden'] if iden in self.views: return for lyriden in vdef['layers']: if lyriden not in self.layers: raise s_exc.NoSuchLayer(mesg=f'No such layer {lyriden}', iden=lyriden) creator = vdef.get('creator', self.auth.rootuser.iden) user = await self.auth.reqUser(creator) await self.auth.addAuthGate(iden, 'view') await user.setAdmin(True, gateiden=iden, logged=False) # worldreadable does not get persisted with the view; the state ends up in perms worldread = vdef.pop('worldreadable', False) if worldread: role = await self.auth.getRoleByName('all') await role.addRule((True, ('view', 'read')), gateiden=iden, nexs=False) self.viewdefs.set(iden, vdef) view = await self._loadView(vdef) view.init2() self._calcViewsByLayer() pack = await view.pack() await self.feedBeholder('view:add', pack, gates=[iden]) return pack
[docs] async def delViewWithLayer(self, iden): ''' Delete a Cortex View and its write Layer if not in use by other View stacks. Note: Any children of the View will have their parent View updated to the deleted View's parent (if present). The deleted View's write Layer will also be removed from any child Views which contain it in their Layer stack. If the Layer is used in Views which are not children of the deleted View, the Layer will be preserved, otherwise it will be deleted as well. ''' view = self.views.get(iden) if view is None: raise s_exc.NoSuchView(mesg=f'No such view {iden=}', iden=iden) if view.info.get('protected'): mesg = f'Cannot delete view ({iden}) that has protected set.' raise s_exc.CantDelView(mesg=mesg) layriden = view.layers[0].iden pareiden = None if view.parent is not None: pareiden = view.parent.iden return await self._push('view:delwithlayer', iden, layriden, newparent=pareiden)
@s_nexus.Pusher.onPush('view:delwithlayer', passitem=True) async def _delViewWithLayer(self, viewiden, layriden, nexsitem, newparent=None): if viewiden == self.view.iden: raise s_exc.SynErr(mesg='Cannot delete the main view') if (view := self.views.get(viewiden)) is not None: self.viewdefs.pop(viewiden) await view.delete() self._calcViewsByLayer() await self.feedBeholder('view:del', {'iden': viewiden}, gates=[viewiden]) await self.auth.delAuthGate(viewiden) if newparent is not None: newview = self.views.get(newparent) layrinuse = False for view in self.viewsbylayer[layriden]: if not view.isForkOf(viewiden): layrinuse = True continue view.layers = [lyr for lyr in view.layers if lyr.iden != layriden] layridens = [lyr.iden for lyr in view.layers] view.info['layers'] = layridens mesg = {'iden': view.iden, 'layers': layridens} await self.feedBeholder('view:setlayers', mesg, gates=[view.iden, layridens[0]]) if view.parent.iden == viewiden: if newparent is None: view.parent = None view.info['parent'] = None else: view.parent = newview view.info['parent'] = newparent mesg = {'iden': view.iden, 'name': 'parent', 'valu': newparent} await self.feedBeholder('view:set', mesg, gates=[view.iden, layridens[0]]) self.viewdefs.set(view.iden, view.info) if not layrinuse and (layr := self.layers.get(layriden)) is not None: del self.layers[layriden] for pdef in layr.layrinfo.get('pushs', {}).values(): await self.delActiveCoro(pdef.get('iden')) for pdef in layr.layrinfo.get('pulls', {}).values(): await self.delActiveCoro(pdef.get('iden')) await self.feedBeholder('layer:del', {'iden': layriden}, gates=[layriden]) await self.auth.delAuthGate(layriden) self.dynitems.pop(layriden) self.layerdefs.pop(layriden) await layr.delete() layr.deloffs = nexsitem[0]
[docs] async def delView(self, iden): view = self.views.get(iden) if view is None: raise s_exc.NoSuchView(mesg=f'No such view {iden=}', iden=iden) if view.info.get('protected'): mesg = f'Cannot delete view ({iden}) that has protected set.' raise s_exc.CantDelView(mesg=mesg) return await self._push('view:del', iden)
@s_nexus.Pusher.onPush('view:del') async def _delView(self, iden): ''' Delete a cortex view by iden. Note: This does not delete any of the view's layers ''' view = self.views.get(iden, None) if view is None: return if iden == self.view.iden: raise s_exc.SynErr(mesg='Cannot delete the main view') for cview in self.views.values(): if cview.parent is not None and cview.parent.iden == iden: raise s_exc.SynErr(mesg='Cannot delete a view that has children') self.viewdefs.pop(iden) await view.delete() self._calcViewsByLayer() await self.feedBeholder('view:del', {'iden': iden}, gates=[iden]) await self.auth.delAuthGate(iden)
[docs] async def delLayer(self, iden): layr = self.layers.get(iden, None) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {iden}', iden=iden) return await self._push('layer:del', iden)
@s_nexus.Pusher.onPush('layer:del', passitem=True) async def _delLayer(self, iden, nexsitem): layr = self.layers.get(iden, None) if layr is None: return for view in self.views.values(): if layr in view.layers: raise s_exc.LayerInUse(iden=iden) del self.layers[iden] for pdef in layr.layrinfo.get('pushs', {}).values(): await self.delActiveCoro(pdef.get('iden')) for pdef in layr.layrinfo.get('pulls', {}).values(): await self.delActiveCoro(pdef.get('iden')) await self.feedBeholder('layer:del', {'iden': iden}, gates=[iden]) await self.auth.delAuthGate(iden) self.dynitems.pop(iden) self.layerdefs.pop(iden) await layr.delete() layr.deloffs = nexsitem[0]
[docs] async def setViewLayers(self, layers, iden=None): ''' Args: layers ([str]): A top-down list of of layer guids iden (str): The view iden (defaults to default view). ''' view = self.getView(iden) if view is None: raise s_exc.NoSuchView(mesg=f'No such view {iden=}', iden=iden) await view.setLayers(layers) self._calcViewsByLayer()
[docs] def getLayer(self, iden=None): ''' Get a Layer object. Args: iden (str): The layer iden to retrieve. Returns: Layer: A Layer object. ''' if iden is None: return self.view.layers[0] # For backwards compatibility, resolve references to old layer iden == cortex.iden to the main layer # TODO: due to our migration policy, remove in 3.0.0 if iden == self.iden: return self.view.layers[0] return self.layers.get(iden)
[docs] def reqLayer(self, iden=None): layr = self.getLayer(iden=iden) if layr is None: mesg = f'No layer found with iden: {iden}' raise s_exc.NoSuchLayer(mesg=mesg, iden=iden) return layr
[docs] def listLayers(self): return self.layers.values()
[docs] async def getLayerDef(self, iden=None): layr = self.getLayer(iden) if layr is not None: return await layr.pack()
[docs] async def getLayerDefs(self): return [await lyr.pack() for lyr in list(self.layers.values())]
[docs] def getView(self, iden=None, user=None): ''' Get a View object. Args: iden (str): The View iden to retrieve. Returns: View: A View object. ''' if iden is None: if user is not None: iden = user.profile.get('cortex:view') if iden is None: iden = self.view.iden # For backwards compatibility, resolve references to old view iden == cortex.iden to the main view # TODO: due to our migration policy, remove in 3.0.0 if iden == self.iden: iden = self.view.iden view = self.views.get(iden) if view is None: return None if user is not None: user.confirm(('view', 'read'), gateiden=iden) return view
[docs] def reqView(self, iden, mesg=None): view = self.getView(iden) if view is None: # pragma: no cover if mesg is None: mesg = f'No view with iden: {iden}' raise s_exc.NoSuchView(mesg=mesg, iden=iden) return view
[docs] def listViews(self): return list(self.views.values())
[docs] async def getViewDef(self, iden): view = self.getView(iden=iden) if view is not None: return await view.pack()
[docs] async def getViewDefs(self, deporder=False): views = list(self.views.values()) if not deporder: return [await v.pack() for v in views] def depth(view): x = 0 llen = len(view.layers) while view: x += 1 view = view.parent return (x, llen) views.sort(key=lambda x: depth(x)) return [await v.pack() for v in views]
[docs] async def addLayer(self, ldef=None, nexs=True): ''' Add a Layer to the cortex. Args: ldef (Optional[Dict]): layer configuration nexs (bool): whether to record a nexus transaction (internal use only) ''' ldef = ldef or {} ldef['iden'] = s_common.guid() ldef['created'] = s_common.now() ldef.setdefault('creator', self.auth.rootuser.iden) ldef.setdefault('lockmemory', self.conf.get('layers:lockmemory')) ldef.setdefault('logedits', self.conf.get('layers:logedits')) ldef.setdefault('readonly', False) s_layer.reqValidLdef(ldef) if nexs: return await self._push('layer:add', ldef) else: return await self._addLayer(ldef, (None, None))
async def _twinLayer(self, oldlayr): newldef = s_msgpack.deepcopy(oldlayr.layrinfo) newldef.pop('iden', None) newldef = await self.addLayer(newldef) newlayr = self.reqLayer(newldef.get('iden')) oldinfo = self.auth.reqAuthGate(oldlayr.iden).pack() for userinfo in oldinfo.get('users', ()): user = self.auth.user(userinfo.get('iden')) if user is None: # pragma: no cover continue if userinfo.get('admin'): await user.setAdmin(True, gateiden=newlayr.iden) for rule in userinfo.get('rules', ()): await user.addRule(rule, gateiden=newlayr.iden) for roleinfo in oldinfo.get('roles', ()): role = self.auth.role(roleinfo.get('iden')) if role is None: # pragma: no cover continue for rule in roleinfo.get('rules', ()): await role.addRule(rule, gateiden=newlayr.iden) return newlayr
[docs] @s_nexus.Pusher.onPushAuto('layer:swap') async def swapLayer(self, oldiden, newiden): ''' Atomically swap out a layer from all views that contain it. ''' self.reqLayer(oldiden) self.reqLayer(newiden) for view in list(self.views.values()): await asyncio.sleep(0) oldlayers = view.info.get('layers') if oldiden not in oldlayers: continue newlayers = list(oldlayers) newlayers[oldlayers.index(oldiden)] = newiden await view._setLayerIdens(newlayers)
@s_nexus.Pusher.onPush('layer:add', passitem=True) async def _addLayer(self, ldef, nexsitem): s_layer.reqValidLdef(ldef) iden = ldef.get('iden') if iden in self.layers: return layr = self.layers.get(iden) if layr is not None: return await layr.pack() creator = ldef.get('creator') user = await self.auth.reqUser(creator) self.layerdefs.set(iden, ldef) layr = await self._initLayr(ldef, nexsoffs=nexsitem[0]) await user.setAdmin(True, gateiden=iden, logged=False) # forward wind the new layer to the current model version await layr._setModelVers(s_modelrev.maxvers) if self.isactive: await layr.initLayerActive() else: await layr.initLayerPassive() pack = await layr.pack() await self.feedBeholder('layer:add', pack, gates=[iden]) return pack def _checkMaxNodes(self, delta=1): if self.maxnodes is None: return remain = self.maxnodes - self.nodecount if remain < delta: mesg = f'Cortex is at node:count limit: {self.maxnodes}' raise s_exc.HitLimit(mesg=mesg) async def _initLayr(self, layrinfo, nexsoffs=None): ''' Instantiate a Layer() instance via the provided layer info HiveDict. ''' layr = await self._ctorLayr(layrinfo) layr.addoffs = nexsoffs self.layers[layr.iden] = layr self.dynitems[layr.iden] = layr if self.maxnodes: counts = await layr.getFormCounts() self.nodecount += sum(counts.values()) def onadd(): self.nodecount += 1 def ondel(): self.nodecount -= 1 layr.nodeAddHook = onadd layr.nodeDelHook = ondel await self.auth.addAuthGate(layr.iden, 'layer') for pdef in layrinfo.get('pushs', {}).values(): await self.runLayrPush(layr, pdef) for pdef in layrinfo.get('pulls', {}).values(): await self.runLayrPull(layr, pdef) await self.fire('core:layr:add', iden=layr.iden) return layr async def _ctorLayr(self, layrinfo): ''' Actually construct the Layer instance for the given HiveDict. ''' return await s_layer.Layer.anit(self, layrinfo) async def _initCoreLayers(self): self.layerdefs = self.cortexdata.getSubKeyVal('layer:info:') for ldef in self.layerdefs.values(): await self._initLayr(ldef)
[docs] @s_nexus.Pusher.onPushAuto('layer:push:add') async def addLayrPush(self, layriden, pdef): s_schemas.reqValidPush(pdef) iden = pdef.get('iden') layr = self.layers.get(layriden) if layr is None: return pushs = layr.layrinfo.get('pushs') if pushs is None: pushs = {} # handle last-message replay if pushs.get(iden) is not None: return pushs[iden] = pdef layr.layrinfo['pushs'] = pushs self.layerdefs.set(layr.iden, layr.layrinfo) await self.runLayrPush(layr, pdef)
[docs] @s_nexus.Pusher.onPushAuto('layer:push:del') async def delLayrPush(self, layriden, pushiden): layr = self.layers.get(layriden) if layr is None: return pushs = layr.layrinfo.get('pushs') if pushs is None: return pdef = pushs.pop(pushiden, None) if pdef is None: return layr.layrinfo['pushs'] = pushs self.layerdefs.set(layr.iden, layr.layrinfo) await self.delActiveCoro(pushiden)
[docs] @s_nexus.Pusher.onPushAuto('layer:pull:add') async def addLayrPull(self, layriden, pdef): s_schemas.reqValidPull(pdef) iden = pdef.get('iden') layr = self.layers.get(layriden) if layr is None: return pulls = layr.layrinfo.get('pulls') if pulls is None: pulls = {} # handle last-message replay if pulls.get(iden) is not None: return pulls[iden] = pdef layr.layrinfo['pulls'] = pulls self.layerdefs.set(layr.iden, layr.layrinfo) await self.runLayrPull(layr, pdef)
[docs] @s_nexus.Pusher.onPushAuto('layer:pull:del') async def delLayrPull(self, layriden, pulliden): layr = self.layers.get(layriden) if layr is None: return pulls = layr.layrinfo.get('pulls') if pulls is None: return pdef = pulls.pop(pulliden, None) if pdef is None: return layr.layrinfo['pulls'] = pulls self.layerdefs.set(layr.iden, layr.layrinfo) await self.delActiveCoro(pulliden)
[docs] async def runLayrPush(self, layr, pdef): url = pdef.get('url') iden = pdef.get('iden') # push() will refire as needed async def push(): async with await self.boss.promote(f'layer push: {layr.iden} {iden}', self.auth.rootuser): async with await s_telepath.openurl(url) as proxy: await self._pushBulkEdits(layr, proxy, pdef) self.addActiveCoro(push, iden=iden)
[docs] async def runLayrPull(self, layr, pdef): url = pdef.get('url') iden = pdef.get('iden') # pull() will refire as needed async def pull(): async with await self.boss.promote(f'layer pull: {layr.iden} {iden}', self.auth.rootuser): async with await s_telepath.openurl(url) as proxy: await self._pushBulkEdits(proxy, layr, pdef) self.addActiveCoro(pull, iden=iden)
async def _pushBulkEdits(self, layr0, layr1, pdef): iden = pdef.get('iden') user = pdef.get('user') gvar = f'push:{iden}' # TODO Remove the defaults in 3.0.0 csize = pdef.get('chunk:size', s_const.layer_pdef_csize) qsize = pdef.get('queue:size', s_const.layer_pdef_qsize) async with await s_base.Base.anit() as base: queue = s_queue.Queue(maxsize=qsize) async def fill(): try: filloffs = await self.getStormVar(gvar, -1) async for item in layr0.syncNodeEdits(filloffs + 1, wait=True): await queue.put(item) await queue.close() except asyncio.CancelledError: # pragma: no cover raise except Exception as e: logger.exception(f'pushBulkEdits fill() error: {e}') await queue.close() base.schedCoro(fill()) async for chunk in queue.slices(): meta = {'time': s_common.now(), 'user': user} alledits = [] for offs, edits in chunk: # prevent push->push->push nodeedits growth alledits.extend(edits) if len(alledits) > csize: await layr1.storNodeEdits(alledits, meta) await self.setStormVar(gvar, offs) alledits.clear() if alledits: await layr1.storNodeEdits(alledits, meta) await self.setStormVar(gvar, offs) async def _checkNexsIndx(self): layroffs = [await layr.getEditIndx() for layr in list(self.layers.values())] if layroffs: maxindx = max(layroffs) if maxindx > await self.getNexsIndx(): await self.setNexsIndx(maxindx)
[docs] async def saveLayerNodeEdits(self, layriden, edits, meta): layr = self.reqLayer(layriden) return await layr.saveNodeEdits(edits, meta)
[docs] async def cloneLayer(self, iden, ldef=None): ''' Make a copy of a Layer in the cortex. Args: iden (str): Layer iden to clone ldef (Optional[Dict]): Layer configuration overrides Note: This should only be called with a reasonably static Cortex due to possible races. ''' layr = self.layers.get(iden, None) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {iden}', iden=iden) ldef = ldef or {} ldef['iden'] = s_common.guid() ldef.setdefault('creator', self.auth.rootuser.iden) return await self._push('layer:clone', iden, ldef)
@s_nexus.Pusher.onPush('layer:clone', passitem=True) async def _cloneLayer(self, iden, ldef, nexsitem): layr = self.layers.get(iden) if layr is None: return newiden = ldef.get('iden') if newiden in self.layers: return newpath = s_common.gendir(self.dirn, 'layers', newiden) await layr.clone(newpath) copyinfo = self.layerdefs.get(iden) for name, valu in copyinfo.items(): ldef.setdefault(name, valu) self.layerdefs.set(newiden, ldef) copylayr = await self._initLayr(ldef, nexsoffs=nexsitem[0]) creator = copyinfo.get('creator') user = await self.auth.reqUser(creator) await user.setAdmin(True, gateiden=newiden, logged=False) return ldef
[docs] def addStormCmd(self, ctor): ''' Add a synapse.lib.storm.Cmd class to the cortex. ''' if not s_grammar.isCmdName(ctor.name): raise s_exc.BadCmdName(name=ctor.name) self.stormcmds[ctor.name] = ctor
[docs] async def addStormDmon(self, ddef): ''' Add a storm dmon task. ''' if ddef.get('iden') is None: ddef['iden'] = s_common.guid() if await self.getStormDmon(ddef['iden']) is not None: mesg = f'Duplicate iden specified for dmon: {ddef["iden"]}' raise s_exc.DupIden(mesg=mesg) return await self._push('storm:dmon:add', ddef)
[docs] @s_nexus.Pusher.onPushAuto('storm:dmon:bump') async def bumpStormDmon(self, iden): ddef = self.stormdmondefs.get(iden) if ddef is None: return False if self.isactive: dmon = self.stormdmons.getDmon(iden) if dmon is not None: await dmon.bump() return True
async def _bumpUserDmons(self, iden): ''' Bump all the Dmons for a given user. Args: iden (str): User iden. ''' for dmoniden, ddef in list(self.stormdmondefs.items()): if ddef.get('user') == iden: await self.bumpStormDmon(dmoniden)
[docs] @s_nexus.Pusher.onPushAuto('storm:dmon:enable') async def enableStormDmon(self, iden): dmon = self.stormdmons.getDmon(iden) if dmon is None: return False if dmon.enabled: return False dmon.enabled = True dmon.ddef['enabled'] = True self.stormdmondefs.set(iden, dmon.ddef) if self.isactive: await dmon.run() return True
[docs] @s_nexus.Pusher.onPushAuto('storm:dmon:disable') async def disableStormDmon(self, iden): dmon = self.stormdmons.getDmon(iden) if dmon is None: return False if not dmon.enabled: return False dmon.enabled = False dmon.ddef['enabled'] = False self.stormdmondefs.set(iden, dmon.ddef) if self.isactive: await dmon.stop() return True
@s_nexus.Pusher.onPush('storm:dmon:add') async def _onAddStormDmon(self, ddef): iden = ddef['iden'] dmon = self.stormdmons.getDmon(iden) if dmon is not None: return dmon.pack() if ddef.get('user') is None: user = await self.auth.getUserByName('root') ddef['user'] = user.iden dmon = await self.runStormDmon(iden, ddef) self.stormdmondefs.set(iden, ddef) return dmon.pack()
[docs] async def delStormDmon(self, iden): ''' Stop and remove a storm dmon. ''' ddef = self.stormdmondefs.get(iden) if ddef is None: mesg = f'No storm daemon exists with iden {iden}.' raise s_exc.NoSuchIden(mesg=mesg) return await self._push('storm:dmon:del', iden)
@s_nexus.Pusher.onPush('storm:dmon:del') async def _delStormDmon(self, iden): ddef = self.stormdmondefs.pop(iden) if ddef is None: # pragma: no cover return await self.stormdmons.popDmon(iden)
[docs] def getStormCmd(self, name): return self.stormcmds.get(name)
[docs] async def runStormDmon(self, iden, ddef): # validate ddef before firing task s_storm.reqValidDdef(ddef) dmon = self.stormdmons.getDmon(iden) if dmon is not None: return dmon await self.auth.reqUser(ddef['user']) # raises if parser failure await self.getStormQuery(ddef.get('storm')) dmon = await self.stormdmons.addDmon(iden, ddef) return dmon
[docs] @s_cell.from_leader async def getStormDmon(self, iden): return self.stormdmons.getDmonDef(iden)
[docs] @s_cell.from_leader async def getStormDmons(self): return self.stormdmons.getDmonDefs()
[docs] @s_cell.from_leader async def getStormDmonLog(self, iden): return self.stormdmons.getDmonRunlog(iden)
[docs] def addStormLib(self, path, ctor): self.stormlibs.append((path, ctor)) root = self.libroot # (name, {kids}, {funcs}) for name in path: step = root[1].get(name) if step is None: step = (name, {}, {}) root[1][name] = step root = step root[2]['ctor'] = ctor
[docs] def getStormLib(self, path): root = self.libroot for name in path: step = root[1].get(name) if step is None: return None root = step return root
[docs] def getStormCmds(self): return list(self.stormcmds.items())
[docs] async def getAxon(self): await self.axready.wait() return self.axon.iden
[docs] def setFeedFunc(self, name, func): ''' Set a data ingest function. def func(snap, items): loaditems... ''' self.feedfuncs[name] = func
[docs] def getFeedFunc(self, name): ''' Get a data ingest function. ''' return self.feedfuncs.get(name)
[docs] async def getFeedFuncs(self): ret = [] for name, ctor in self.feedfuncs.items(): # TODO - Future support for feed functions defined via Storm. doc = getattr(ctor, '__doc__', None) if doc is None: doc = 'No feed docstring' doc = doc.strip() desc = doc.split('\n')[0] ret.append({'name': name, 'desc': desc, 'fulldoc': doc, }) return tuple(ret)
async def _addSynNodes(self, snap, items): ''' Add nodes to the Cortex via the packed node format. ''' async for node in snap.addNodes(items): pass
[docs] async def setUserLocked(self, iden, locked): retn = await s_cell.Cell.setUserLocked(self, iden, locked) await self._bumpUserDmons(iden) return retn
[docs] def getCoreMod(self, name): return self.modules.get(name)
[docs] def getCoreMods(self): ret = [] for modname, mod in self.modules.items(): ret.append((modname, mod.conf)) return ret
def _initStormOpts(self, opts): if opts is None: opts = {} opts.setdefault('user', self.auth.rootuser.iden) return opts def _viewFromOpts(self, opts, user=None): if user is None: user = self._userFromOpts(opts) viewiden = opts.get('view') if viewiden is None: viewiden = user.profile.get('cortex:view') if viewiden is None: viewiden = self.view.iden # For backwards compatibility, resolve references to old view iden == cortex.iden to the main view # TODO: due to our migration policy, remove in 3.0.0 if viewiden == self.iden: # pragma: no cover viewiden = self.view.iden view = self.views.get(viewiden) if view is None: raise s_exc.NoSuchView(mesg=f'No such view iden={viewiden}', iden=viewiden) user.confirm(('view', 'read'), gateiden=viewiden) return view def _userFromOpts(self, opts): if opts is None: return self.auth.rootuser useriden = opts.get('user') if useriden is None: return self.auth.rootuser user = self.auth.user(useriden) if user is None: mesg = f'No user found with iden: {useriden}' raise s_exc.NoSuchUser(mesg=mesg, user=useriden) return user
[docs] async def count(self, text, opts=None): opts = self._initStormOpts(opts) if self.stormpool is not None and opts.get('mirror', True): extra = await self.getLogExtra(text=text) proxy = await self._getMirrorProxy(opts) if proxy is not None: logger.info(f'Offloading Storm query {{{text}}} to mirror.', extra=extra) mirropts = await self._getMirrorOpts(opts) try: return await proxy.count(text, opts=mirropts) except s_exc.TimeOut: mesg = 'Timeout waiting for query mirror, running locally instead.' logger.warning(mesg) if (nexsoffs := opts.get('nexsoffs')) is not None: if not await self.waitNexsOffs(nexsoffs, timeout=opts.get('nexstimeout')): raise s_exc.TimeOut(mesg=f'Timeout waiting for nexus offset {nexsoffs} in count()') view = self._viewFromOpts(opts) i = 0 async for _ in view.eval(text, opts=opts): i += 1 return i
async def _getMirrorOpts(self, opts): assert 'nexsoffs' in opts mirropts = s_msgpack.deepcopy(opts) mirropts['mirror'] = False mirropts['nexstimeout'] = self.stormpoolopts.get('timeout:sync') return mirropts async def _getMirrorProxy(self, opts): if self.stormpool is None: # pragma: no cover return None if self.stormpool.size() == 0: logger.warning('Storm query mirror pool is empty, running query locally.') return None proxy = None try: timeout = self.stormpoolopts.get('timeout:connection') proxy = await self.stormpool.proxy(timeout=timeout) proxyname = proxy._ahainfo.get('name') if proxyname is not None and proxyname == self.ahasvcname: # we are part of the pool and were selected. Convert to local use. return None curoffs = opts.setdefault('nexsoffs', await self.getNexsIndx() - 1) miroffs = await s_common.wait_for(proxy.getNexsIndx(), timeout) - 1 if (delta := curoffs - miroffs) > MAX_NEXUS_DELTA: mesg = (f'Pool mirror [{proxyname}] Nexus offset delta too large ' f'({delta} > {MAX_NEXUS_DELTA}), running query locally.') logger.warning(mesg, extra=await self.getLogExtra(delta=delta, mirror=proxyname, mirror_offset=miroffs)) return None return proxy except (TimeoutError, s_exc.IsFini): if proxy is None: logger.warning('Timeout waiting for pool mirror, running query locally.') else: mesg = f'Timeout waiting for pool mirror [{proxyname}] Nexus offset, running query locally.' logger.warning(mesg, extra=await self.getLogExtra(mirror=proxyname)) await proxy.fini() return None
[docs] async def storm(self, text, opts=None): opts = self._initStormOpts(opts) if self.stormpool is not None and opts.get('mirror', True): extra = await self.getLogExtra(text=text) proxy = await self._getMirrorProxy(opts) if proxy is not None: logger.info(f'Offloading Storm query {{{text}}} to mirror.', extra=extra) mirropts = await self._getMirrorOpts(opts) try: async for mesg in proxy.storm(text, opts=mirropts): yield mesg return except s_exc.TimeOut: mesg = 'Timeout waiting for query mirror, running locally instead.' logger.warning(mesg) if (nexsoffs := opts.get('nexsoffs')) is not None: if not await self.waitNexsOffs(nexsoffs, timeout=opts.get('nexstimeout')): raise s_exc.TimeOut(mesg=f'Timeout waiting for nexus offset {nexsoffs} in storm().') view = self._viewFromOpts(opts) async for mesg in view.storm(text, opts=opts): yield mesg
[docs] async def callStorm(self, text, opts=None): opts = self._initStormOpts(opts) if self.stormpool is not None and opts.get('mirror', True): extra = await self.getLogExtra(text=text) proxy = await self._getMirrorProxy(opts) if proxy is not None: logger.info(f'Offloading Storm query {{{text}}} to mirror.', extra=extra) mirropts = await self._getMirrorOpts(opts) try: return await proxy.callStorm(text, opts=mirropts) except s_exc.TimeOut: mesg = 'Timeout waiting for query mirror, running locally instead.' logger.warning(mesg) if (nexsoffs := opts.get('nexsoffs')) is not None: if not await self.waitNexsOffs(nexsoffs, timeout=opts.get('nexstimeout')): raise s_exc.TimeOut(mesg=f'Timeout waiting for nexus offset {nexsoffs} in callStorm().') view = self._viewFromOpts(opts) return await view.callStorm(text, opts=opts)
[docs] async def exportStorm(self, text, opts=None): opts = self._initStormOpts(opts) if self.stormpool is not None and opts.get('mirror', True): extra = await self.getLogExtra(text=text) proxy = await self._getMirrorProxy(opts) if proxy is not None: logger.info(f'Offloading Storm query {{{text}}} to mirror.', extra=extra) mirropts = await self._getMirrorOpts(opts) try: async for mesg in proxy.exportStorm(text, opts=mirropts): yield mesg return except s_exc.TimeOut: mesg = 'Timeout waiting for query mirror, running locally instead.' logger.warning(mesg) if (nexsoffs := opts.get('nexsoffs')) is not None: if not await self.waitNexsOffs(nexsoffs, timeout=opts.get('nexstimeout')): raise s_exc.TimeOut(mesg=f'Timeout waiting for nexus offset {nexsoffs} in exportStorm().') user = self._userFromOpts(opts) view = self._viewFromOpts(opts) taskinfo = {'query': text, 'view': view.iden} taskiden = opts.get('task') await self.boss.promote('storm:export', user=user, info=taskinfo, taskiden=taskiden) with s_scope.enter({'user': user}): async with await s_spooled.Dict.anit(dirn=self.dirn, cell=self) as spooldict: async with await self.snap(user=user, view=view) as snap: async for pode in snap.iterStormPodes(text, opts=opts): await spooldict.set(pode[1]['iden'], pode) await asyncio.sleep(0) for iden, pode in spooldict.items(): await asyncio.sleep(0) edges = [] async for verb, n2iden in snap.iterNodeEdgesN1(s_common.uhex(iden)): await asyncio.sleep(0) if not spooldict.has(n2iden): continue edges.append((verb, n2iden)) if edges: pode[1]['edges'] = edges yield pode
[docs] async def exportStormToAxon(self, text, opts=None): async with await self.axon.upload() as fd: async for pode in self.exportStorm(text, opts=opts): await fd.write(s_msgpack.en(pode)) size, sha256 = await fd.save() return (size, s_common.ehex(sha256))
[docs] async def feedFromAxon(self, sha256, opts=None): opts = self._initStormOpts(opts) user = self._userFromOpts(opts) view = self._viewFromOpts(opts) taskiden = opts.get('task') taskinfo = {'name': 'syn.nodes', 'sha256': sha256, 'view': view.iden} await self.boss.promote('feeddata', user=user, info=taskinfo, taskiden=taskiden) # ensure that the user can make all node edits in the layer user.confirm(('node',), gateiden=view.layers[0].iden) q = s_queue.Queue(maxsize=10000) feedexc = None async with await s_base.Base.anit() as base: async def fill(): nonlocal feedexc try: async for item in self.axon.iterMpkFile(sha256): await q.put(item) except Exception as e: logger.exception(f'feedFromAxon.fill(): {e}') feedexc = e finally: await q.close() base.schedCoro(fill()) count = 0 async with await self.snap(user=user, view=view) as snap: # feed the items directly to syn.nodes async for items in q.slices(size=100): async for node in snap.addNodes(items): count += 1 if feedexc is not None: raise feedexc return count
[docs] async def nodes(self, text, opts=None): ''' A simple non-streaming way to return a list of nodes. ''' if self.isfini: # pragma: no cover raise s_exc.IsFini() opts = self._initStormOpts(opts) view = self._viewFromOpts(opts) return await view.nodes(text, opts=opts)
[docs] async def stormlist(self, text, opts=None): return [m async for m in self.storm(text, opts=opts)]
async def _getStormEval(self, text): try: astvalu = copy.deepcopy(await s_parser.evalcache.aget(text)) except s_exc.FatalErr: logger.exception(f'Fatal error while parsing [{text}]', extra={'synapse': {'text': text}}) await self.fini() raise astvalu.init(self) return astvalu async def _getStormQuery(self, args): try: query = copy.deepcopy(await s_parser.querycache.aget(args)) except s_exc.FatalErr: logger.exception(f'Fatal error while parsing [{args}]', extra={'synapse': {'text': args[0]}}) await self.fini() raise query.init(self) await asyncio.sleep(0) return query
[docs] async def getStormQuery(self, text, mode='storm'): return await self.querycache.aget((text, mode))
[docs] @contextlib.asynccontextmanager async def getStormRuntime(self, query, opts=None): opts = self._initStormOpts(opts) view = self._viewFromOpts(opts) user = self._userFromOpts(opts) async with await self.snap(user=user, view=view) as snap: async with snap.getStormRuntime(query, opts=opts, user=user) as runt: yield runt
[docs] async def reqValidStorm(self, text, opts=None): ''' Parse a storm query to validate it. Args: text (str): The text of the Storm query to parse. opts (dict): A Storm options dictionary. Returns: True: If the query is valid. Raises: BadSyntaxError: If the query is invalid. ''' if opts is None: opts = {} mode = opts.get('mode', 'storm') await self.getStormQuery(text, mode=mode) return True
def _logStormQuery(self, text, user, info=None): ''' Log a storm query. ''' if self.stormlog: if info is None: info = {} info['text'] = text info['username'] = user.name info['user'] = user.iden stormlogger.log(self.stormloglvl, 'Executing storm query {%s} as [%s]', text, user.name, extra={'synapse': info})
[docs] async def getNodeByNdef(self, ndef, view=None): ''' Return a single Node() instance by (form,valu) tuple. ''' name, valu = ndef form = self.model.forms.get(name) if form is None: raise s_exc.NoSuchForm.init(name) norm, info = form.type.norm(valu) buid = s_common.buid((form.name, norm)) async with await self.snap(view=view) as snap: return await snap.getNodeByBuid(buid)
[docs] def getCoreInfo(self): '''This API is deprecated.''' s_common.deprecated('Cortex.getCoreInfo') return { 'version': synapse.version, 'modeldef': self.model.getModelDefs(), 'stormcmds': {cmd: {} for cmd in self.stormcmds.keys()}, }
[docs] async def getCoreInfoV2(self): return { 'version': synapse.version, 'modeldict': await self.getModelDict(), 'stormdocs': await self.getStormDocs(), }
[docs] async def getStormDocs(self): ''' Get a struct containing the Storm Types documentation. Returns: dict: A Dictionary of storm documentation information. ''' cmds = [] for name, cmd in self.stormcmds.items(): entry = { 'name': name, 'doc': cmd.getCmdBrief(), } if cmd.pkgname: entry['package'] = cmd.pkgname if cmd.svciden: entry['svciden'] = cmd.svciden cmds.append(entry) ret = { 'libraries': s_stormtypes.registry.getLibDocs(), 'types': s_stormtypes.registry.getTypeDocs(), 'commands': cmds, # 'packages': ... # TODO - Support inline information for packages? } return ret
[docs] async def addNodes(self, nodedefs, view=None): ''' Quickly add/modify a list of nodes from node definition tuples. This API is the simplest/fastest way to add nodes, set node props, and add tags to nodes remotely. Args: nodedefs (list): A list of node definition tuples. See below. A node definition tuple is defined as: ( (form, valu), {'props':{}, 'tags':{}) The "props" or "tags" keys may be omitted. ''' async with await self.snap(view=view) as snap: snap.strict = False async for node in snap.addNodes(nodedefs): yield node
[docs] async def addFeedData(self, name, items, *, viewiden=None): ''' Add data using a feed/parser function. Args: name (str): The name of the feed record format. items (list): A list of items to ingest. viewiden (str): The iden of a view to use. If a view is not specified, the default view is used. ''' view = self.getView(viewiden) if view is None: raise s_exc.NoSuchView(mesg=f'No such view iden={viewiden}', iden=viewiden) async with await self.snap(view=view) as snap: snap.strict = False await snap.addFeedData(name, items)
[docs] async def snap(self, user=None, view=None): ''' Return a transaction object for the default view. Args: user (str): The user to get the snap for. view (View): View object to use when making the snap. Notes: This must be used as an asynchronous context manager. Returns: s_snap.Snap: A Snap object for the view. ''' if view is None: view = self.view if user is None: user = await self.auth.getUserByName('root') snap = await view.snap(user) return snap
[docs] async def loadCoreModule(self, ctor, conf=None): ''' Load a single cortex module with the given ctor and conf. Args: ctor (str): The python module class path conf (dict):Config dictionary for the module ''' if conf is None: conf = {} modu = self._loadCoreModule(ctor, conf=conf) try: await s_coro.ornot(modu.preCoreModule) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception(f'module preCoreModule failed: {ctor}') self.modules.pop(ctor, None) return mdefs = modu.getModelDefs() self.model.addDataModels(mdefs) cmds = modu.getStormCmds() [self.addStormCmd(c) for c in cmds] try: await s_coro.ornot(modu.initCoreModule) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception(f'module initCoreModule failed: {ctor}') self.modules.pop(ctor, None) return await self.fire('core:module:load', module=ctor) return modu
async def _loadCoreMods(self): mods = [] cmds = [] mdefs = [] for ctor in list(s_modules.coremods): await self._preLoadCoreModule(ctor, mods, cmds, mdefs) for ctor in self.conf.get('modules'): await self._preLoadCoreModule(ctor, mods, cmds, mdefs, custom=True) self.model.addDataModels(mdefs) [self.addStormCmd(c) for c in cmds] async def _preLoadCoreModule(self, ctor, mods, cmds, mdefs, custom=False): conf = None # allow module entry to be (ctor, conf) tuple if isinstance(ctor, (list, tuple)): ctor, conf = ctor modu = self._loadCoreModule(ctor, conf=conf) if modu is None: return mods.append(modu) try: await s_coro.ornot(modu.preCoreModule) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception(f'module preCoreModule failed: {ctor}') self.modules.pop(ctor, None) return cmds.extend(modu.getStormCmds()) model_defs = modu.getModelDefs() if custom: for _mdef, mnfo in model_defs: mnfo['custom'] = True mdefs.extend(model_defs) async def _initCoreMods(self): for ctor, modu in list(self.modules.items()): try: await s_coro.ornot(modu.initCoreModule) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception(f'module initCoreModule failed: {ctor}') self.modules.pop(ctor, None) def _loadCoreModule(self, ctor, conf=None): if ctor in self.modules: raise s_exc.ModAlreadyLoaded(mesg=f'{ctor} already loaded') try: modu = s_dyndeps.tryDynFunc(ctor, self, conf=conf) self.modules[ctor] = modu return modu except Exception: logger.exception('mod load fail: %s' % (ctor,)) return None
[docs] async def getPropNorm(self, prop, valu, typeopts=None): ''' Get the normalized property value based on the Cortex data model. Args: prop (str): The property to normalize. valu: The value to normalize. typeopts: A Synapse type opts dictionary used to further normalize the value. Returns: (tuple): A two item tuple, containing the normed value and the info dictionary. Raises: s_exc.NoSuchProp: If the prop does not exist. s_exc.BadTypeValu: If the value fails to normalize. ''' pobj = self.model.prop(prop) if pobj is None: raise s_exc.NoSuchProp(mesg=f'The property {prop} does not exist.', prop=prop) tobj = pobj.type if typeopts: tobj = tobj.clone(typeopts) norm, info = tobj.norm(valu) return norm, info
[docs] async def getTypeNorm(self, name, valu, typeopts=None): ''' Get the normalized type value based on the Cortex data model. Args: name (str): The type to normalize. valu: The value to normalize. typeopts: A Synapse type opts dictionary used to further normalize the value. Returns: (tuple): A two item tuple, containing the normed value and the info dictionary. Raises: s_exc.NoSuchType: If the type does not exist. s_exc.BadTypeValu: If the value fails to normalize. ''' tobj = self.model.type(name) if tobj is None: raise s_exc.NoSuchType(mesg=f'The type {name} does not exist.', name=name) if typeopts: tobj = tobj.clone(typeopts) norm, info = tobj.norm(valu) return norm, info
@staticmethod def _convert_reqdict(reqdict): return {s_agenda.TimeUnit.fromString(k): v for (k, v) in reqdict.items()}
[docs] async def addCronJob(self, cdef): ''' Add a cron job to the cortex. Convenience wrapper around agenda.add A cron job is a persistently-stored item that causes storm queries to be run in the future. The specification for the times that the queries run can be one-shot or recurring. Args: query (str): The storm query to execute in the future reqs (Union[Dict[str, Union[int, List[int]]], List[Dict[...]]]): Either a dict of the fixed time fields or a list of such dicts. The keys are in the set ('year', 'month', 'dayofmonth', 'dayofweek', 'hour', 'minute'. The values must be positive integers, except for the key of 'dayofmonth' in which it may also be a negative integer which represents the number of days from the end of the month with -1 representing the last day of the month. All values may also be lists of valid values. incunit (Optional[str]): A member of the same set as above, with an additional member 'day'. If is None (default), then the appointment is one-shot and will not recur. incvals (Union[int, List[int]): A integer or a list of integers of the number of units Returns (bytes): An iden that can be used to later modify, query, and delete the job. Notes: reqs must have fields present or incunit must not be None (or both) The incunit if not None it must be larger in unit size than all the keys in all reqs elements. Non-recurring jobs may also have a req of 'now' which will cause the job to also execute immediately. ''' s_schemas.reqValidCronDef(cdef) iden = cdef.get('iden') appt = self.agenda.appts.get(iden) if appt is not None: raise s_exc.DupIden(mesg=f'Duplicate cron iden ({iden})') incunit = cdef.get('incunit') reqs = cdef.get('reqs') try: if incunit is not None: if isinstance(incunit, (list, tuple)): incunit = [s_agenda.TimeUnit.fromString(i) for i in incunit] else: incunit = s_agenda.TimeUnit.fromString(incunit) cdef['incunit'] = incunit if isinstance(reqs, Mapping): reqs = self._convert_reqdict(reqs) else: reqs = [self._convert_reqdict(req) for req in reqs] if incunit is not None and s_agenda.TimeUnit.NOW in reqs: mesg = "Recurring jobs may not be scheduled to run 'now'" raise s_exc.BadConfValu(mesg) cdef['reqs'] = reqs except KeyError: raise s_exc.BadConfValu('Unrecognized time unit') if not cdef.get('iden'): cdef['iden'] = s_common.guid() cdef['created'] = s_common.now() opts = {'user': cdef['creator'], 'view': cdef.get('view')} view = self._viewFromOpts(opts) cdef['view'] = view.iden return await self._push('cron:add', cdef)
@s_nexus.Pusher.onPush('cron:add') async def _onAddCronJob(self, cdef): iden = cdef['iden'] appt = self.agenda.appts.get(iden) if appt is not None: return appt.pack() self.auth.reqNoAuthGate(iden) user = await self.auth.reqUser(cdef['creator']) cdef = await self.agenda.add(cdef) await self.auth.addAuthGate(iden, 'cronjob') await user.setAdmin(True, gateiden=iden, logged=False) await self.feedBeholder('cron:add', cdef, gates=[iden]) return cdef
[docs] async def moveCronJob(self, useriden, croniden, viewiden): view = self._viewFromOpts({'view': viewiden, 'user': useriden}) appt = self.agenda.appts.get(croniden) if appt is None: raise s_exc.NoSuchIden(iden=croniden) if appt.view == view.iden: return croniden return await self._push('cron:move', croniden, viewiden)
@s_nexus.Pusher.onPush('cron:move') async def _onMoveCronJob(self, croniden, viewiden): await self.agenda.move(croniden, viewiden) await self.feedBeholder('cron:move', {'iden': croniden, 'view': viewiden}, gates=[croniden]) return croniden
[docs] @s_nexus.Pusher.onPushAuto('cron:del') async def delCronJob(self, iden): ''' Delete a cron job Args: iden (bytes): The iden of the cron job to be deleted ''' await self._killCronTask(iden) try: await self.agenda.delete(iden) except s_exc.NoSuchIden: return await self.feedBeholder('cron:del', {'iden': iden}, gates=[iden]) await self.auth.delAuthGate(iden)
[docs] @s_nexus.Pusher.onPushAuto('cron:mod') async def updateCronJob(self, iden, query): ''' Change an existing cron job's query Args: iden (bytes): The iden of the cron job to be changed ''' await self.agenda.mod(iden, query) await self.feedBeholder('cron:edit:query', {'iden': iden, 'query': query}, gates=[iden])
[docs] @s_nexus.Pusher.onPushAuto('cron:enable') async def enableCronJob(self, iden): ''' Enable a cron job Args: iden (bytes): The iden of the cron job to be changed ''' await self.agenda.enable(iden) await self.feedBeholder('cron:enable', {'iden': iden}, gates=[iden]) logger.info(f'Enabled cron job {iden}', extra=await self.getLogExtra(iden=iden, status='MODIFY'))
[docs] @s_nexus.Pusher.onPushAuto('cron:disable') async def disableCronJob(self, iden): ''' Enable a cron job Args: iden (bytes): The iden of the cron job to be changed ''' await self.agenda.disable(iden) await self._killCronTask(iden) await self.feedBeholder('cron:disable', {'iden': iden}, gates=[iden]) logger.info(f'Disabled cron job {iden}', extra=await self.getLogExtra(iden=iden, status='MODIFY'))
[docs] async def killCronTask(self, iden): if self.agenda.appts.get(iden) is None: return False return await self._push('cron:task:kill', iden)
@s_nexus.Pusher.onPush('cron:task:kill') async def _killCronTask(self, iden): appt = self.agenda.appts.get(iden) if appt is None: return False task = appt.task if task is None: return False self.schedCoro(task.kill()) return True
[docs] async def listCronJobs(self): ''' Get information about all the cron jobs accessible to the current user ''' crons = [] for _, cron in self.agenda.list(): info = cron.pack() user = self.auth.user(cron.creator) if user is not None: info['username'] = user.name crons.append(info) return crons
[docs] @s_nexus.Pusher.onPushAuto('cron:edit') async def editCronJob(self, iden, name, valu): ''' Modify a cron job definition. ''' appt = await self.agenda.get(iden) # TODO make this generic and check cdef if name == 'creator': await self.auth.reqUser(valu) appt.creator = valu elif name == 'name': appt.name = str(valu) elif name == 'doc': appt.doc = str(valu) elif name == 'pool': appt.pool = bool(valu) else: mesg = f'editCronJob name {name} is not supported for editing.' raise s_exc.BadArg(mesg=mesg) await appt.save() pckd = appt.pack() await self.feedBeholder(f'cron:edit:{name}', {'iden': iden, name: pckd.get(name)}, gates=[iden]) return pckd
[docs] @s_nexus.Pusher.onPushAuto('cron:edits') async def addCronEdits(self, iden, edits): ''' Take a dictionary of edits and apply them to the appointment (cron job) ''' appt = await self.agenda.get(iden) await appt.edits(edits)
[docs] @contextlib.asynccontextmanager async def enterMigrationMode(self): async with self._migration_lock: self.migration = True yield self.migration = False
[docs] async def iterFormRows(self, layriden, form, stortype=None, startvalu=None): ''' Yields buid, valu tuples of nodes of a single form, optionally (re)starting at startvalu. Args: layriden (str): Iden of the layer to retrieve the nodes form (str): A form name. stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop startvalu (Any): The value to start at. May only be not None if stortype is not None. Returns: AsyncIterator[Tuple(buid, valu)] ''' layr = self.getLayer(layriden) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {layriden}', iden=layriden) async for item in layr.iterFormRows(form, stortype=stortype, startvalu=startvalu): yield item
[docs] async def iterPropRows(self, layriden, form, prop, stortype=None, startvalu=None): ''' Yields buid, valu tuples of nodes with a particular secondary property, optionally (re)starting at startvalu. Args: layriden (str): Iden of the layer to retrieve the nodes form (str): A form name. prop (str): A universal property name. stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop startvalu (Any): The value to start at. May only be not None if stortype is not None. Returns: AsyncIterator[Tuple(buid, valu)] ''' layr = self.getLayer(layriden) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {layriden}', iden=layriden) async for item in layr.iterPropRows(form, prop, stortype=stortype, startvalu=startvalu): yield item
[docs] async def iterUnivRows(self, layriden, prop, stortype=None, startvalu=None): ''' Yields buid, valu tuples of nodes with a particular universal property, optionally (re)starting at startvalu. Args: layriden (str): Iden of the layer to retrieve the nodes prop (str): A universal property name. stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop startvalu (Any): The value to start at. May only be not None if stortype is not None. Returns: AsyncIterator[Tuple(buid, valu)] ''' layr = self.getLayer(layriden) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {layriden}', iden=layriden) async for item in layr.iterUnivRows(prop, stortype=stortype, startvalu=startvalu): yield item
[docs] async def iterTagRows(self, layriden, tag, form=None, starttupl=None): ''' Yields (buid, (valu, form)) values that match a tag and optional form, optionally (re)starting at starttupl. Args: layriden (str): Iden of the layer to retrieve the nodes tag (str): the tag to match form (Optional[str]): if present, only yields buids of nodes that match the form. starttupl (Optional[Tuple[buid, form]]): if present, (re)starts the stream of values there. Returns: AsyncIterator[Tuple(buid, (valu, form))] Note: This yields (buid, (tagvalu, form)) instead of just buid, valu in order to allow resuming an interrupted call by feeding the last value retrieved into starttupl ''' layr = self.getLayer(layriden) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {layriden}', iden=layriden) async for item in layr.iterTagRows(tag, form=form, starttupl=starttupl): yield item
[docs] async def iterTagPropRows(self, layriden, tag, prop, form=None, stortype=None, startvalu=None): ''' Yields (buid, valu) that match a tag:prop, optionally (re)starting at startvalu. Args: layriden (str): Iden of the layer to retrieve the nodes tag (str): tag name prop (str): prop name form (Optional[str]): optional form name stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop startvalu (Any): The value to start at. May only be not None if stortype is not None. Returns: AsyncIterator[Tuple(buid, valu)] ''' layr = self.getLayer(layriden) if layr is None: raise s_exc.NoSuchLayer(mesg=f'No such layer {layriden}', iden=layriden) async for item in layr.iterTagPropRows(tag, prop, form=form, stortype=stortype, startvalu=startvalu): yield item
def _initVaults(self): self.vaultsdb = self.slab.initdb('vaults') # { idenb: s_msgpack.en(vault), ... } self.vaultsbynamedb = self.slab.initdb('vaults:byname') # { name.encode(): idenb, ... } # TSI = type, scope, iden. This is used to deconflict uniqueness of # scoped vaults without requiring a bunch of other indexes. self.vaultsbyTSIdb = self.slab.initdb('vaults:byTSI') # { TSI.encode(): idenb, ... } def _getVaults(self): ''' Slab helper function for getting all vaults. ''' genr = self.slab.scanByFull(db=self.vaultsdb) for idenb, byts in genr: vault = s_msgpack.un(byts) yield vault def _getVaultByBidn(self, bidn): ''' Slab helper function for getting a vault by iden (bytes). ''' byts = self.slab.get(bidn, db=self.vaultsdb) if byts is None: return None vault = s_msgpack.un(byts) return vault def _getVaultByTSI(self, vtype, scope, iden): ''' Slab helper function for getting a vault by type,scope,iden. ''' if scope == 'global': tsi = f'{vtype}:global' elif scope in ('user', 'role'): tsi = f'{vtype}:{scope}:{iden}' else: raise s_exc.BadArg(mesg=f'Invalid scope: {scope}.') bidn = self.slab.get(tsi.encode(), db=self.vaultsbyTSIdb) if bidn is None: return None return self._getVaultByBidn(bidn)
[docs] def getVault(self, iden): ''' Get a vault. Args: iden (str): Iden of the vault to get. Returns: vault or None ''' if not s_common.isguid(iden): return None bidn = s_common.uhex(iden) return self._getVaultByBidn(bidn)
[docs] def getVaultByName(self, name): ''' Get a vault by name. Args: name (str): Name of the vault to get. Returns: vault or None ''' bidn = self.slab.get(name.encode(), db=self.vaultsbynamedb) if bidn is None: return None return self._getVaultByBidn(bidn)
[docs] def getVaultByType(self, vtype, useriden, scope=None): ''' Get a vault of type `vtype` and scope `scope` for user with `iden`. This function allows the caller to retrieve a vault of the specified `vtype` by searching for the first available vault that matches the `vtype` and `scope` criteria. The search order for opening vaults is as follows: - If `scope` is specified, return the vault with `vtype` and `scope`. Return None if such a vault doesn't exist. - Check 'user' scope for a vault of `vtype`. Continue if non-existent. - Check 'role' scope for a vault of `vtype`. Continue if non-existent. - Check 'global' scope for a vault of `vtype`. Continue if non-existent. - Return None Args: vtype (str): Type of the vault to open. useriden (str): Iden of user trying to open the vault. scope (str|None): The vault scope to open. Raises: synapse.exc.BadArg: Invalid scope specified. Returns: vault or None if matching vault could not be found. ''' if scope not in (None, 'user', 'role', 'global'): raise s_exc.BadArg(mesg=f'Invalid scope: {scope}.') def _getVault(_scope): vault = None if _scope == 'user': vault = self._getVaultByTSI(vtype, _scope, useriden) elif _scope == 'role': user = self.auth.user(useriden) if user is None: mesg = f'No user with iden {useriden}.' raise s_exc.NoSuchUser(mesg=mesg, user=useriden) for role in user.getRoles(): vault = self._getVaultByTSI(vtype, _scope, role.iden) if vault: if not self._hasEasyPerm(vault, user, s_cell.PERM_READ): vault = None continue break elif _scope == 'global': vault = self._getVaultByTSI(vtype, _scope, None) return vault # If caller specified a scope, return that vault if it exists if scope is not None: return _getVault(scope) # Finally, try the user, role, and global vaults in order for _scope in ('user', 'role', 'global'): vault = _getVault(_scope) if vault: return vault return None
[docs] def reqVault(self, iden): ''' Get a vault by iden. Args: iden (str): Iden of the vault to get. Raises: synapse.exc.NoSuchIden: Vault with `iden` not found. Returns: vault ''' if not s_common.isguid(iden): raise s_exc.BadArg(mesg=f'Iden is not a valid iden: {iden}.') vault = self.getVault(iden) if vault is None: raise s_exc.NoSuchIden(mesg=f'Vault not found for iden: {iden}.') return vault
[docs] def reqVaultByName(self, name): ''' Get a vault by name. Args: name (str): Name of the vault to get. Raises: synapse.exc.NoSuchName: Vault with `name` not found. Returns: vault ''' vault = self.getVaultByName(name) if vault is None: raise s_exc.NoSuchName(mesg=f'Vault not found for name: {name}.') return vault
[docs] def reqVaultByType(self, vtype, iden, scope=None): ''' Get a vault by type. Args: vtype (str): Type of the vault to get. iden (str): Iden of the user or role for the vault type. scope (str|None): Scope of the vault to get. Raises: synapse.exc.NoSuchName: Vault with `vtype`/`iden`/`scope` not found. Returns: vault ''' vault = self.getVaultByType(vtype, iden, scope) if vault is None: raise s_exc.NoSuchName(mesg=f'Vault not found for type: {vtype}.') return vault
[docs] async def addVault(self, vdef): ''' Create a new vault. Args: vdef (dict): The vault to add. Raises: synapse.exc.SchemaViolation: `vdef` does not conform to the vault schema. synapse.exc.DupName: - Vault already exists for type/scope/owner. - Vault already exists with specified name. synapse.exc.BadArg: - Invalid vault definition provided. - Owner required for unscoped, user, and role vaults. - Vault secrets is not msgpack safe. - Vault configs is not msgpack safe. Returns: iden of new vault ''' if not isinstance(vdef, dict): raise s_exc.BadArg(mesg='Invalid vault definition provided.') # Set some standard properties on the vdef before validating vdef['iden'] = s_common.guid() if 'permissions' in vdef: vdef.pop('permissions') self._initEasyPerm(vdef, default=s_cell.PERM_DENY) vault = s_schemas.reqValidVault(vdef) scope = vault.get('scope') vtype = vault.get('type') owner = vault.get('owner') name = vault.get('name') if owner is None and scope != 'global': raise s_exc.BadArg(mesg='Owner required for unscoped, user, and role vaults.') # Make sure the type/scope/owner combination is unique. Not for unscoped vaults if scope is not None and self._getVaultByTSI(vtype, scope, owner) is not None: raise s_exc.DupName(mesg=f'Vault already exists for type {vtype}, scope {scope}, owner {owner}.') # Make sure the requested name is unique if self.getVaultByName(name) is not None: raise s_exc.DupName(mesg=f'Vault {name} already exists.') secrets = vault.get('secrets') configs = vault.get('configs') try: s_msgpack.en(secrets) except s_exc.NotMsgpackSafe as exc: raise s_exc.BadArg(mesg=f'Vault secrets must be msgpack safe.') from None try: s_msgpack.en(configs) except s_exc.NotMsgpackSafe as exc: raise s_exc.BadArg(mesg=f'Vault configs must be msgpack safe.') from None if scope == 'global': # everyone gets read access await self._setEasyPerm(vault, 'roles', self.auth.allrole.iden, s_cell.PERM_READ) elif scope == 'user': user = self.auth.user(owner) if user is None: raise s_exc.NoSuchUser(mesg=f'User with iden {owner} not found.') # The user is the admin, everyone else no access await self._setEasyPerm(vault, 'users', owner, s_cell.PERM_ADMIN) elif scope == 'role': role = self.auth.role(owner) if role is None: raise s_exc.NoSuchRole(mesg=f'Role with iden {owner} not found.') # role members gets read access await self._setEasyPerm(vault, 'roles', owner, s_cell.PERM_READ) else: # Unscoped vaults # The creator gets admin, everyone else no access await self._setEasyPerm(vault, 'users', owner, s_cell.PERM_ADMIN) return await self._push('vault:add', vault)
@s_nexus.Pusher.onPush('vault:add') async def _addVault(self, vault): iden = vault.get('iden') name = vault.get('name') scope = vault.get('scope') bidn = s_common.uhex(iden) if scope is not None: vtype = vault.get('type') owner = vault.get('owner') if scope == 'global': tsi = f'{vtype}:global' else: tsi = f'{vtype}:{scope}:{owner}' self.slab.put(tsi.encode(), bidn, db=self.vaultsbyTSIdb) self.slab.put(name.encode(), bidn, db=self.vaultsbynamedb) self.slab.put(bidn, s_msgpack.en(vault), db=self.vaultsdb) return iden
[docs] async def setVaultSecrets(self, iden, key, valu): ''' Set vault secret item. This function sets the `key`:`valu` into the vault secrets. Args: iden (str): The iden of the vault to edit. key (str): Vault secret key. valu (str): Vault secret value. s_common.novalu to delete a key. Raises: synapse.exc.NoSuchIden: Vault with `iden` does not exist. synapse.exc.NotMsgpackSafe: One of `key` or `valu` is not msgpack safe. Returns: Updated vault. ''' vault = self.reqVault(iden) secrets = vault.get('secrets') delete = False if valu is s_common.novalu: if key not in secrets: raise s_exc.BadArg(mesg=f'Key {key} not found in vault secrets.') valu = None delete = True else: try: s_msgpack.en({key: valu}) except s_exc.NotMsgpackSafe as exc: raise s_exc.NotMsgpackSafe(mesg=f'Vault secrets must be msgpack safe.') from None return await self._push('vault:data:set', iden, 'secrets', key, valu, delete)
[docs] async def setVaultConfigs(self, iden, key, valu): ''' Set vault config item. This function sets the `key`:`valu` into the vault configs. Args: iden (str): The iden of the vault to edit. key (str): Vault secret key. valu (str): Vault secret value. s_common.novalu to delete a key. Raises: synapse.exc.NoSuchIden: Vault with `iden` does not exist. synapse.exc.NotMsgpackSafe: One of `key` or `valu` is not msgpack safe. Returns: Updated vault. ''' vault = self.reqVault(iden) configs = vault.get('configs') delete = False if valu is s_common.novalu: if key not in configs: raise s_exc.BadArg(mesg=f'Key {key} not found in vault configs.') valu = None delete = True else: try: s_msgpack.en({key: valu}) except s_exc.NotMsgpackSafe as exc: raise s_exc.NotMsgpackSafe(mesg=f'Vault configs must be msgpack safe.') from None return await self._push('vault:data:set', iden, 'configs', key, valu, delete)
@s_nexus.Pusher.onPush('vault:data:set') async def _setVaultData(self, iden, obj, key, valu, delete): vault = self.reqVault(iden) data = vault.get(obj) bidn = s_common.uhex(iden) if delete: if key in data: data.pop(key) else: data[key] = valu self.slab.put(bidn, s_msgpack.en(vault), db=self.vaultsdb) return data
[docs] async def replaceVaultConfigs(self, iden, valu): ''' Replace the entire vault config. Args: iden (str): The iden of the vault to edit. valu (str): New configs object to store on the vault. Raises: synapse.exc.BadArg: `valu` is not a dictionary. synapse.exc.NoSuchIden: Vault with `iden` does not exist. synapse.exc.NotMsgpackSafe: `valu` is not msgpack safe. Returns: New configs. ''' vault = self.reqVault(iden) if not isinstance(valu, dict): raise s_exc.BadArg(mesg='valu must be a dictionary.', name='valu', valu=valu) try: s_msgpack.en(valu) except s_exc.NotMsgpackSafe: short = textwrap.shorten(repr(valu), width=64) raise s_exc.NotMsgpackSafe( mesg='Vault configs must be msgpack safe.', name='valu', valu=short) from None return await self._push('vault:data:replace', iden, 'configs', valu)
[docs] async def replaceVaultSecrets(self, iden, valu): ''' Replace the entire vault config. Args: iden (str): The iden of the vault to edit. valu (str): New secrets object to store on the vault. Raises: synapse.exc.BadArg: `valu` is not a dictionary. synapse.exc.NoSuchIden: Vault with `iden` does not exist. synapse.exc.NotMsgpackSafe: `valu` is not msgpack safe. Returns: New secrets. ''' vault = self.reqVault(iden) if not isinstance(valu, dict): raise s_exc.BadArg(mesg='valu must be a dictionary.', name='valu', valu=valu) try: s_msgpack.en(valu) except s_exc.NotMsgpackSafe: short = textwrap.shorten(repr(valu), width=64) raise s_exc.NotMsgpackSafe( mesg='Vault secrets must be msgpack safe.', name='valu', valu=short) from None return await self._push('vault:data:replace', iden, 'secrets', valu)
@s_nexus.Pusher.onPush('vault:data:replace') async def _replaceVaultData(self, iden, obj, valu): vault = self.reqVault(iden) bidn = s_common.uhex(iden) vault[obj] = valu self.slab.put(bidn, s_msgpack.en(vault), db=self.vaultsdb) return valu
[docs] def listVaults(self): ''' List all vaults. Args: None Raises: None Yields: tuples of vault info: (<iden>, <name>, <type>, <scope>). ''' for vault in self._getVaults(): yield vault
[docs] async def setVaultPerm(self, viden, iden, level): ''' Set vault permissions. Args: viden (str): The iden of the vault to edit. iden (str): Iden of the user/role to add permissions for. level (int): Easy perms level. Raises: synapse.exc.NoSuchIden: Vault with `iden` does not exist. Returns: Updated vault. ''' vault = self.reqVault(viden) scope = 'users' ident = self.auth.user(iden) if ident is None: scope = 'roles' ident = self.auth.role(iden) if ident is None: raise s_exc.NoSuchIden(mesg=f'Iden {iden} is not a valid user or role.') await self._setEasyPerm(vault, scope, ident.iden, level) permissions = vault.get('permissions') return await self._push(('vault:set'), viden, 'permissions', permissions)
[docs] async def renameVault(self, iden, name): ''' Rename a vault. Args: iden (str): Iden of the vault to rename. name (str): New vault name. Raises: synapse.exc.NoSuchIden: Vault with `iden` does not exist. synapse.exc.DupName: Vault with `name` already exists. Returns: Updated vault. ''' if self.getVaultByName(name) is not None: raise s_exc.DupName(mesg=f'Vault with name {name} already exists.') return await self._push(('vault:set'), iden, 'name', name)
@s_nexus.Pusher.onPush('vault:set') async def _setVault(self, iden, key, valu): if key not in ('name', 'permissions'): # pragma: no cover raise s_exc.BadArg('Only vault names and permissions can be changed.') vault = self.reqVault(iden) oldv = vault.get(key) vault[key] = valu s_schemas.reqValidVault(vault) bidn = s_common.uhex(iden) if key == 'name': self.slab.delete(oldv.encode(), db=self.vaultsbynamedb) self.slab.put(valu.encode(), bidn, db=self.vaultsbynamedb) self.slab.put(bidn, s_msgpack.en(vault), db=self.vaultsdb) return vault
[docs] @s_nexus.Pusher.onPushAuto('vault:del') async def delVault(self, iden): ''' Delete a vault. Args: iden (str): Iden of the vault to delete. Returns: None ''' vault = self.getVault(iden) if vault is None: return bidn = s_common.uhex(iden) name = vault.get('name') vtype = vault.get('type') scope = vault.get('scope') tsi = None if scope == 'global': tsi = f'{vtype}:global' elif scope in ('user', 'role'): owner = vault.get('owner') tsi = f'{vtype}:{scope}:{owner}' if tsi is not None: self.slab.delete(tsi.encode(), db=self.vaultsbyTSIdb) self.slab.delete(name.encode(), db=self.vaultsbynamedb) self.slab.delete(bidn, db=self.vaultsdb)
def _propAllowedReason(self, user, perms, gateiden=None, default=None): ''' Similar to allowed, but always prefer the default value specified by the caller. Default values are still pulled from permdefs if there is a match there; but still prefer caller default. This results in a ternary response that can be used to know if a rule had a positive/negative or no match. The matching reason metadata is also returned. ''' if default is None: permdef = self.getPermDef(perms) if permdef: default = permdef.get('default', default) return user.getAllowedReason(perms, gateiden=gateiden, default=default)
[docs] def confirmPropSet(self, user, prop, layriden): meta0 = self._propAllowedReason(user, prop.setperms[0], gateiden=layriden) if meta0.isadmin: return allowed0 = meta0.value meta1 = self._propAllowedReason(user, prop.setperms[1], gateiden=layriden) allowed1 = meta1.value if allowed0: if allowed1: return elif allowed1 is False: # This is a allow-with-precedence case. # Inspect meta to determine if the rule a0 is more specific than rule a1 if len(meta0.rule) >= len(meta1.rule): return user.raisePermDeny(prop.setperms[0], gateiden=layriden) return if allowed1: if allowed0 is None: return # allowed0 here is False. This is a deny-with-precedence case. # Inspect meta to determine if the rule a1 is more specific than rule a0 if len(meta1.rule) > len(meta0.rule): return user.raisePermDeny(prop.setperms[0], gateiden=layriden)
[docs] def confirmPropDel(self, user, prop, layriden): meta0 = self._propAllowedReason(user, prop.delperms[0], gateiden=layriden) if meta0.isadmin: return allowed0 = meta0.value meta1 = self._propAllowedReason(user, prop.delperms[1], gateiden=layriden) allowed1 = meta1.value if allowed0: if allowed1: return elif allowed1 is False: # This is a allow-with-precedence case. # Inspect meta to determine if the rule a0 is more specific than rule a1 if len(meta0.rule) >= len(meta1.rule): return user.raisePermDeny(prop.delperms[0], gateiden=layriden) return if allowed1: if allowed0 is None: return # allowed0 here is False. This is a deny-with-precedence case. # Inspect meta to determine if the rule a1 is more specific than rule a0 if len(meta1.rule) > len(meta0.rule): return user.raisePermDeny(prop.delperms[0], gateiden=layriden)
[docs] @contextlib.asynccontextmanager async def getTempCortex(mods=None): ''' Get a proxy to a cortex backed by a temporary directory. Args: mods (list): A list of modules which are loaded into the cortex. Notes: The cortex and temporary directory are town down on exit. This should only be called from synchronous code. Returns: Proxy to the cortex. ''' with s_common.getTempDir() as dirn: logger.debug(f'Creating temporary cortex as {dirn}') conf = { 'health:sysctl:checks': False, } async with await Cortex.anit(dirn, conf=conf) as core: if mods: for mod in mods: await core.loadCoreModule(mod) async with core.getLocalProxy() as prox: yield prox