import logging
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.storm as s_storm
import synapse.lib.stormtypes as s_stormtypes
evaldesc = '''\
Evaluate a Storm runtime value and optionally cast/coerce it.
Note:
If Storm logging is enabled, the expression being evaluated will be logged
separately.
'''
rundesc = '''
Run a Storm query and yield the messages output by the Storm interpreter.
Note:
If Storm logging is enabled, the query being run will be logged separately.
'''
stormlogger = logging.getLogger('synapse.storm')
[docs]
class StormExecCmd(s_storm.Cmd):
'''
Execute text or an embedded query object as Storm in the current pipeline.
NOTE: It is recommended to avoid using this where possible to avoid potential
query injection risks. If you must use this, take care to ensure any values
being executed have been properly sanitized.
Examples:
// Add nodes using text in a variable
$query = '[ inet:fqdn=foo.com inet:fqdn=bar.net ]'
storm.exec $query
// Filter nodes in the pipeline using text in a variable
$filter = '-:asn=10'
inet:ipv4:asn
storm.exec $filter
// Pivot using an embedded query
$pivot = ${ -> inet:asn }
inet:ipv4:asn
storm.exec $pivot
'''
name = 'storm.exec'
[docs]
def getArgParser(self):
pars = s_storm.Cmd.getArgParser(self)
pars.add_argument('query', help='The Storm to execute.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if self.runtsafe:
text = await s_stormtypes.tostr(self.opts.query)
query = await runt.getStormQuery(text)
extra = await self.runt.snap.core.getLogExtra(text=text, view=self.runt.snap.view.iden)
stormlogger.info(f'Executing storm query via storm.exec {{{text}}} as [{self.runt.user.name}]', extra=extra)
async with runt.getSubRuntime(query) as subr:
async for subp in subr.execute(genr=genr):
yield subp
else:
item = None
async for item in genr:
break
text = await s_stormtypes.tostr(self.opts.query)
query = await runt.getStormQuery(text)
extra = await self.runt.snap.core.getLogExtra(text=text, view=self.runt.snap.view.iden)
stormlogger.info(f'Executing storm query via storm.exec {{{text}}} as [{self.runt.user.name}]', extra=extra)
async with runt.getSubRuntime(query) as subr:
async for subp in subr.execute(genr=s_common.agen(item)):
yield subp
async for item in genr:
text = await s_stormtypes.tostr(self.opts.query)
query = await runt.getStormQuery(text)
subr.runtvars.clear()
subr.query = query
subr._initRuntVars(query)
extra = await self.runt.snap.core.getLogExtra(text=text, view=self.runt.snap.view.iden)
stormlogger.info(f'Executing storm query via storm.exec {{{text}}} as [{self.runt.user.name}]', extra=extra)
async for subp in subr.execute(genr=s_common.agen(item)):
yield subp
[docs]
@s_stormtypes.registry.registerLib
class LibStorm(s_stormtypes.Lib):
'''
A Storm library for evaluating dynamic storm expressions.
'''
_storm_locals = (
{'name': 'eval', 'desc': evaldesc,
'type': {'type': 'function', '_funcname': '_evalStorm',
'args': (
{'name': 'text', 'type': 'str', 'desc': 'A storm expression string.'},
{'name': 'cast', 'type': 'str', 'desc': 'A type to cast the result to.', 'default': None},
),
'returns': {'type': 'any', 'desc': 'The value of the expression and optional cast.'}}},
{'name': 'run', 'desc': rundesc,
'type': {'type': 'function', '_funcname': '_runStorm',
'args': (
{'name': 'query', 'type': 'str', 'desc': 'A Storm query string.'},
{'name': 'opts', 'type': 'dict', 'desc': 'Storm options dictionary.', 'default': None},
),
'returns': {'name': 'yields', 'type': 'list', 'desc': 'The output messages from the Storm runtime.'}}},
)
_storm_lib_path = ('storm',)
[docs]
def getObjLocals(self):
return {
'run': self._runStorm,
'eval': self._evalStorm,
}
async def _runStorm(self, query, opts=None):
opts = await s_stormtypes.toprim(opts)
query = await s_stormtypes.tostr(query)
if opts is None:
opts = {}
user = opts.get('user')
if user is None:
user = opts['user'] = self.runt.user.iden
if user != self.runt.user.iden:
self.runt.confirm(('impersonate',))
opts.setdefault('view', self.runt.snap.view.iden)
async for mesg in self.runt.snap.view.core.storm(query, opts=opts):
yield mesg
@s_stormtypes.stormfunc(readonly=True)
async def _evalStorm(self, text, cast=None):
text = await s_stormtypes.tostr(text)
cast = await s_stormtypes.tostr(cast, noneok=True)
if self.runt.snap.core.stormlog:
extra = await self.runt.snap.core.getLogExtra(text=text, view=self.runt.snap.view.iden)
stormlogger.info(f'Executing storm query via $lib.storm.eval() {{{text}}} as [{self.runt.user.name}]', extra=extra)
casttype = None
if cast:
casttype = self.runt.model.type(cast)
if casttype is None:
castprop = self.runt.model.prop(cast)
if castprop is not None:
casttype = castprop.type
if casttype is None:
mesg = f'No type or property found for name: {cast}'
raise s_exc.NoSuchType(mesg=mesg)
asteval = await self.runt.snap.core._getStormEval(text)
valu = await asteval.compute(self.runt, None)
if casttype:
valu, _ = casttype.norm(valu)
return valu