Source code for synapse.lib.stormlib.graph

import synapse.exc as s_exc

import synapse.lib.config as s_config
import synapse.lib.msgpack as s_msgpack
import synapse.lib.schemas as s_schemas
import synapse.lib.stormtypes as s_stormtypes

USER_EDITABLE = {
    'desc',
    'name',
    'refs',
    'edges',
    'forms',
    'pivots',
    'degrees',
    'filters',
    'filterinput',
    'yieldfiltered'
}

[docs] @s_stormtypes.registry.registerLib class GraphLib(s_stormtypes.Lib): ''' A Storm Library for interacting with graph projections in the Cortex. ''' _storm_lib_path = ('graph',) _storm_locals = ( {'name': 'add', 'desc': ''' Add a graph projection to the Cortex. Example: Add a graph projection named "Test Projection":: $rules = ({ "name": "Test Projection", "desc": "My test projection", "degrees": 2, "pivots": [" <(seen)- meta:source "], "filters": ["-#nope"], "forms": { "inet:fqdn": { "pivots": ["<- *", "-> *"], "filters": ["-inet:fqdn:issuffix=1"] }, "*": { "pivots": ["-> #"], } } }) $lib.graph.add($rules) ''', 'type': {'type': 'function', '_funcname': '_methGraphAdd', 'args': ( {'name': 'gdef', 'type': 'dict', 'desc': 'A graph projection definition.'}, ), 'returns': {'type': 'null'}}}, {'name': 'del', 'desc': 'Delete a graph projection from the Cortex.', 'type': {'type': 'function', '_funcname': '_methGraphDel', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the graph projection to delete.'}, ), 'returns': {'type': 'null'}}}, {'name': 'get', 'desc': 'Get a graph projection definition from the Cortex.', 'type': {'type': 'function', '_funcname': '_methGraphGet', 'args': ( {'name': 'iden', 'type': 'str', 'default': None, 'desc': 'The iden of the graph projection to get. If not specified, ' 'returns the current graph projection.'}, ), 'returns': {'type': 'dict', 'desc': 'A graph projection definition, or None if no iden was ' 'specified and there is currently no graph projection set.'}}}, {'name': 'mod', 'desc': 'Modify user editable properties of a graph projection.', 'type': {'type': 'function', '_funcname': '_methGraphMod', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the graph projection to modify.'}, {'name': 'info', 'type': 'dict', 'desc': 'A dictionary of the properties to edit.'}, ), 'returns': {'type': 'null'}}}, {'name': 'list', 'desc': 'List the graph projections available in the Cortex.', 'type': {'type': 'function', '_funcname': '_methGraphList', 'returns': {'type': 'list', 'desc': 'A list of graph projection definitions.'}}}, {'name': 'grant', 'desc': 'Modify permissions granted to users/roles on a graph projection.', 'type': {'type': 'function', '_funcname': '_methGraphGrant', 'args': ( {'name': 'gden', 'type': 'str', 'desc': 'Iden of the graph projection to modify.', }, {'name': 'scope', 'type': 'str', 'desc': 'The scope, either "users" or "roles".', }, {'name': 'iden', 'type': 'str', 'desc': 'The user/role iden depending on scope.', }, {'name': 'level', 'type': 'int', 'desc': 'The permission level number.', }, ), 'returns': {'type': 'null', }}}, {'name': 'revoke', 'desc': 'Revoke permissions granted to users/roles on a graph projection.', 'type': {'type': 'function', '_funcname': '_methGraphRevoke', 'args': ( {'name': 'gden', 'type': 'str', 'desc': 'Iden of the graph projection to modify.'}, {'name': 'scope', 'type': 'str', 'desc': 'The scope, either "users" or "roles".'}, {'name': 'iden', 'type': 'str', 'desc': 'The user/role iden depending on scope.'}, ), 'returns': {'type': 'null'}}}, {'name': 'activate', 'desc': 'Set the graph projection to use for the top level Storm Runtime.', 'type': {'type': 'function', '_funcname': '_methGraphActivate', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the graph projection to use.'}, ), 'returns': {'type': 'null'}}}, )
[docs] def getObjLocals(self): return { 'add': self._methGraphAdd, 'del': self._methGraphDel, 'get': self._methGraphGet, 'mod': self._methGraphMod, 'list': self._methGraphList, 'grant': self._methGraphGrant, 'revoke': self._methGraphRevoke, 'activate': self._methGraphActivate, }
async def _methGraphAdd(self, gdef): gdef = await s_stormtypes.toprim(gdef) return await self.runt.snap.core.addStormGraph(gdef, user=self.runt.user) @s_stormtypes.stormfunc(readonly=True) async def _methGraphGet(self, iden=None): iden = await s_stormtypes.tostr(iden, noneok=True) if iden is None: return self.runt.getGraph() return await self.runt.snap.core.getStormGraph(iden, user=self.runt.user) async def _methGraphDel(self, iden): iden = await s_stormtypes.tostr(iden) await self.runt.snap.core.delStormGraph(iden, user=self.runt.user) async def _methGraphMod(self, iden, info): iden = await s_stormtypes.tostr(iden) info = await s_stormtypes.toprim(info) for prop in info.keys(): if prop not in USER_EDITABLE: raise s_exc.BadArg(mesg=f'User may not edit the field: {prop}.') await self.runt.snap.core.modStormGraph(iden, info, user=self.runt.user) @s_stormtypes.stormfunc(readonly=True) async def _methGraphList(self): projs = [] async for proj in self.runt.snap.core.getStormGraphs(user=self.runt.user): projs.append(proj) return list(sorted(projs, key=lambda x: x.get('name'))) async def _methGraphGrant(self, gden, scope, iden, level): gden = await s_stormtypes.tostr(gden) scope = await s_stormtypes.tostr(scope) iden = await s_stormtypes.tostr(iden) level = await s_stormtypes.toint(level, noneok=True) await self.runt.snap.core.setStormGraphPerm(gden, scope, iden, level, user=self.runt.user) async def _methGraphRevoke(self, gden, scope, iden): gden = await s_stormtypes.tostr(gden) scope = await s_stormtypes.tostr(scope) iden = await s_stormtypes.tostr(iden) await self.runt.snap.core.setStormGraphPerm(gden, scope, iden, None, user=self.runt.user) async def _methGraphActivate(self, iden): gdef = await self._methGraphGet(iden) self.runt.setGraph(gdef)