Source code for synapse.lib.stormlib.storm

import logging

import synapse.exc as s_exc

import synapse.lib.stormtypes as s_stormtypes

evaldesc = '''\
Evaluate a storm runtime value and optionally cast/coerce it.

NOTE: If storm logging is enabled, the expression being evaluated will be logged
separately.
'''

stormlogger = logging.getLogger('synapse.storm')

[docs]@s_stormtypes.registry.registerLib class LibStorm(s_stormtypes.Lib): ''' A Storm library for evaluating dynamic storm expressions. ''' _storm_locals = ( {'name': 'eval', 'desc': evaldesc, 'type': {'type': 'function', '_funcname': '_evalStorm', 'args': ( {'name': 'text', 'type': 'str', 'desc': 'A storm expression string.'}, {'name': 'cast', 'type': 'str', 'desc': 'A type to cast the result to.', 'default': None}, ), 'returns': {'type': 'any', 'desc': 'The value of the expression and optional cast.', }}}, ) _storm_lib_path = ('storm',)
[docs] def getObjLocals(self): return { 'eval': self._evalStorm, }
@s_stormtypes.stormfunc(readonly=True) async def _evalStorm(self, text, cast=None): text = await s_stormtypes.tostr(text) cast = await s_stormtypes.tostr(cast, noneok=True) if self.runt.snap.core.stormlog: extra = await self.runt.snap.core.getLogExtra(text=text) stormlogger.info(f'Executing storm query via $lib.storm.eval() {{{text}}} as [{self.runt.user.name}]', extra=extra) casttype = None if cast: casttype = self.runt.model.type(cast) if casttype is None: castprop = self.runt.model.prop(cast) if castprop is not None: casttype = castprop.type if casttype is None: mesg = f'No type or property found for name: {cast}' raise s_exc.NoSuchType(mesg=mesg) asteval = await self.runt.snap.core._getStormEval(text) valu = await asteval.compute(self.runt, None) if casttype: valu, _ = casttype.norm(valu) return valu