import synapse.exc as s_exc
import synapse.lib.config as s_config
import synapse.lib.msgpack as s_msgpack
import synapse.lib.schemas as s_schemas
import synapse.lib.stormtypes as s_stormtypes
gdefSchema = {
'type': 'object',
'properties': {
'iden': {'type': 'string', 'pattern': s_config.re_iden},
'name': {'type': 'string', 'minLength': 1},
'desc': {'type': 'string', 'default': ''},
'scope': {'type': 'string', 'enum': ['user', 'power-up']},
'creator': {'type': 'string', 'pattern': s_config.re_iden},
'power-up': {'type': 'string', 'minLength': 1},
'maxsize': {'type': 'number', 'minimum': 0},
'existing': {'type': 'array', 'items': {'type': 'string'}},
'created': {'type': 'number'},
'updated': {'type': 'number'},
'refs': {'type': 'boolean', 'default': False},
'edges': {'type': 'boolean', 'default': True},
'edgelimit': {'type': 'number', 'default': 3000},
'degrees': {'type': ['integer', 'null'], 'minimum': 0},
'filterinput': {'type': 'boolean', 'default': True},
'yieldfiltered': {'type': 'boolean', 'default': False},
'filters': {
'type': ['array', 'null'],
'items': {'type': 'string'}
},
'pivots': {
'type': ['array', 'null'],
'items': {'type': 'string'}
},
'forms': {
'type': 'object',
'patternProperties': {
'^.*$': {
'type': 'object',
'properties': {
'filters': {
'type': ['array', 'null'],
'items': {'type': 'string'}
},
'pivots': {
'type': ['array', 'null'],
'items': {'type': 'string'}
}
},
'additionalProperties': False,
}
}
},
'permissions': s_msgpack.deepcopy(s_schemas.easyPermSchema)
},
'additionalProperties': False,
'required': ['iden', 'name', 'scope'],
'allOf': [
{
'if': {'properties': {'scope': {'const': 'power-up'}}},
'then': {'required': ['power-up']},
'else': {'required': ['creator']},
}
]
}
reqValidGdef = s_config.getJsValidator(gdefSchema)
USER_EDITABLE = {
'desc',
'name',
'refs',
'edges',
'forms',
'pivots',
'degrees',
'filters',
'filterinput',
'yieldfiltered'
}
[docs]
@s_stormtypes.registry.registerLib
class GraphLib(s_stormtypes.Lib):
'''
A Storm Library for interacting with graph projections in the Cortex.
'''
_storm_lib_path = ('graph',)
_storm_locals = (
{'name': 'add',
'desc': '''
Add a graph projection to the Cortex.
Example:
Add a graph projection named "Test Projection"::
$rules = ({
"name": "Test Projection",
"desc": "My test projection",
"degrees": 2,
"pivots": ["-> meta:seen"],
"filters": ["-#nope"],
"forms": {
"inet:fqdn": {
"pivots": ["<- *", "-> *"],
"filters": ["-inet:fqdn:issuffix=1"]
},
"*": {
"pivots": ["-> #"],
}
}
})
$lib.graph.add($rules)
''',
'type': {'type': 'function', '_funcname': '_methGraphAdd',
'args': (
{'name': 'gdef', 'type': 'dict', 'desc': 'A graph projection definition.'},
),
'returns': {'type': 'null'}}},
{'name': 'del', 'desc': 'Delete a graph projection from the Cortex.',
'type': {'type': 'function', '_funcname': '_methGraphDel',
'args': (
{'name': 'iden', 'type': 'str', 'desc': 'The iden of the graph projection to delete.'},
),
'returns': {'type': 'null'}}},
{'name': 'get', 'desc': 'Get a graph projection definition from the Cortex.',
'type': {'type': 'function', '_funcname': '_methGraphGet',
'args': (
{'name': 'iden', 'type': 'str', 'default': None,
'desc': 'The iden of the graph projection to get. If not specified, '
'returns the current graph projection.'},
),
'returns': {'type': 'dict',
'desc': 'A graph projection definition, or None if no iden was '
'specified and there is currently no graph projection set.'}}},
{'name': 'mod', 'desc': 'Modify user editable properties of a graph projection.',
'type': {'type': 'function', '_funcname': '_methGraphMod',
'args': (
{'name': 'iden', 'type': 'str', 'desc': 'The iden of the graph projection to modify.'},
{'name': 'info', 'type': 'dict', 'desc': 'A dictionary of the properties to edit.'},
),
'returns': {'type': 'null'}}},
{'name': 'list', 'desc': 'List the graph projections available in the Cortex.',
'type': {'type': 'function', '_funcname': '_methGraphList',
'returns': {'type': 'list', 'desc': 'A list of graph projection definitions.'}}},
{'name': 'grant', 'desc': 'Modify permissions granted to users/roles on a graph projection.',
'type': {'type': 'function', '_funcname': '_methGraphGrant',
'args': (
{'name': 'gden', 'type': 'str', 'desc': 'Iden of the graph projection to modify.', },
{'name': 'scope', 'type': 'str', 'desc': 'The scope, either "users" or "roles".', },
{'name': 'iden', 'type': 'str', 'desc': 'The user/role iden depending on scope.', },
{'name': 'level', 'type': 'int', 'desc': 'The permission level number.', },
),
'returns': {'type': 'null', }}},
{'name': 'revoke', 'desc': 'Revoke permissions granted to users/roles on a graph projection.',
'type': {'type': 'function', '_funcname': '_methGraphRevoke',
'args': (
{'name': 'gden', 'type': 'str', 'desc': 'Iden of the graph projection to modify.'},
{'name': 'scope', 'type': 'str', 'desc': 'The scope, either "users" or "roles".'},
{'name': 'iden', 'type': 'str', 'desc': 'The user/role iden depending on scope.'},
),
'returns': {'type': 'null'}}},
{'name': 'activate', 'desc': 'Set the graph projection to use for the top level Storm Runtime.',
'type': {'type': 'function', '_funcname': '_methGraphActivate',
'args': (
{'name': 'iden', 'type': 'str',
'desc': 'The iden of the graph projection to use.'},
),
'returns': {'type': 'null'}}},
)
[docs]
def getObjLocals(self):
return {
'add': self._methGraphAdd,
'del': self._methGraphDel,
'get': self._methGraphGet,
'mod': self._methGraphMod,
'list': self._methGraphList,
'grant': self._methGraphGrant,
'revoke': self._methGraphRevoke,
'activate': self._methGraphActivate,
}
async def _methGraphAdd(self, gdef):
gdef = await s_stormtypes.toprim(gdef)
return await self.runt.snap.core.addStormGraph(gdef, user=self.runt.user)
@s_stormtypes.stormfunc(readonly=True)
async def _methGraphGet(self, iden=None):
iden = await s_stormtypes.tostr(iden, noneok=True)
if iden is None:
return self.runt.getGraph()
return await self.runt.snap.core.getStormGraph(iden, user=self.runt.user)
async def _methGraphDel(self, iden):
iden = await s_stormtypes.tostr(iden)
await self.runt.snap.core.delStormGraph(iden, user=self.runt.user)
async def _methGraphMod(self, iden, info):
iden = await s_stormtypes.tostr(iden)
info = await s_stormtypes.toprim(info)
for prop in info.keys():
if prop not in USER_EDITABLE:
raise s_exc.BadArg(mesg=f'User may not edit the field: {prop}.')
await self.runt.snap.core.modStormGraph(iden, info, user=self.runt.user)
@s_stormtypes.stormfunc(readonly=True)
async def _methGraphList(self):
projs = []
async for proj in self.runt.snap.core.getStormGraphs(user=self.runt.user):
projs.append(proj)
return list(sorted(projs, key=lambda x: x.get('name')))
async def _methGraphGrant(self, gden, scope, iden, level):
gden = await s_stormtypes.tostr(gden)
scope = await s_stormtypes.tostr(scope)
iden = await s_stormtypes.tostr(iden)
level = await s_stormtypes.toint(level, noneok=True)
await self.runt.snap.core.setStormGraphPerm(gden, scope, iden, level, user=self.runt.user)
async def _methGraphRevoke(self, gden, scope, iden):
gden = await s_stormtypes.tostr(gden)
scope = await s_stormtypes.tostr(scope)
iden = await s_stormtypes.tostr(iden)
await self.runt.snap.core.setStormGraphPerm(gden, scope, iden, None, user=self.runt.user)
async def _methGraphActivate(self, iden):
gdef = await self._methGraphGet(iden)
self.runt.setGraph(gdef)