import os
import copy
import json
import logging
import pathlib
import contextlib
import synapse.common as s_common
import synapse.cortex as s_cortex
import synapse.lib.base as s_base
import synapse.lib.cmdr as s_cmdr
import synapse.lib.msgpack as s_msgpack
import synapse.tools.storm as s_t_storm
loggers_to_supress = (
'synapse.lib.view',
)
[docs]def getDocPath(fn, root=None):
'''
Helper for getting a documentation data file paths.
Args:
fn (str): Name of the file to retrieve the full path for.
root (str): Optional root path to look for a docdata in.
Notes:
Defaults to looking for the ``docdata`` directory in the current
working directory. This behavior works fine for notebooks nested
in the docs directory of synapse; but this root directory that
is looked for may be overridden by providing an alternative root.
Returns:
str: A file path.
Raises:
ValueError if the file does not exist or directory traversal attempted..
'''
cwd = pathlib.Path(os.getcwd())
if root:
cwd = pathlib.Path(root)
# Walk up a directory until you find '...d./data'
while True:
dpath = cwd.joinpath('docdata')
if dpath.is_dir():
break
parent = cwd.parent
if parent == cwd:
raise ValueError(f'Unable to find data directory from {os.getcwd()}.')
cwd = parent
# Protect against traversal
fpath = os.path.abspath(os.path.join(dpath.as_posix(), fn))
if not fpath.startswith(dpath.as_posix()):
raise ValueError(f'Path escaping detected: {fn}')
# Existence
if not os.path.isfile(fpath):
raise ValueError(f'File does not exist: {fn}')
return fpath
[docs]def getDocData(fp, root=None):
'''
Args:
fp (str): Name of the file to retrieve the data of.
root (str): Optional root path to look for a docdata directory in.
Notes:
Will detect json/jsonl/yaml/mpk extensions and automatically
decode that data if found; otherwise it returns bytes.
Defaults to looking for the ``docdata`` directory in the current
working directory. This behavior works fine for notebooks nested
in the docs directory of synapse; but this root directory that
is looked for may be overridden by providing an alternative root.
Returns:
data: May be deserialized data or bytes.
Raises:
ValueError if the file does not exist or directory traversal attempted..
'''
fpath = getDocPath(fp, root)
if fpath.endswith('.yaml'):
return s_common.yamlload(fpath)
if fpath.endswith('.json'):
return s_common.jsload(fpath)
with s_common.genfile(fpath) as fd:
if fpath.endswith('.mpk'):
return s_msgpack.un(fd.read())
if fpath.endswith('.jsonl'):
recs = []
for line in fd.readlines():
recs.append(json.loads(line.decode()))
return recs
return fd.read()
[docs]@contextlib.asynccontextmanager
async def genTempCoreProxy(mods=None):
'''Get a temporary cortex proxy.'''
with s_common.getTempDir() as dirn:
async with await s_cortex.Cortex.anit(dirn) as core:
if mods:
for mod in mods:
await core.loadCoreModule(mod)
async with core.getLocalProxy() as prox:
# Use object.__setattr__ to hulk smash and avoid proxy getattr magick
object.__setattr__(prox, '_core', core)
yield prox
[docs]@contextlib.asynccontextmanager
async def genTempStormsvcProxy(cmdrcore, svcname, svcctor, conf=None):
if conf is None:
conf = {}
conf = copy.deepcopy(conf)
with s_common.getTempDir() as dirn:
async with await svcctor(dirn, conf=conf) as svc:
root = await svc.auth.getUserByName('root')
await root.setPasswd('secret')
info = await svc.dmon.listen('tcp://127.0.0.1:0/')
svc.dmon.test_addr = info
host, port = info
surl = f'tcp://root:[email protected]:{port}/'
await cmdrcore.storm(f'service.add {svcname} {surl}')
await cmdrcore.storm(f'$lib.service.wait({svcname})')
async with svc.getLocalProxy() as prox:
# Use object.__setattr__ to hulk smash and avoid proxy getattr magick
object.__setattr__(prox, '_svc', svc)
yield prox
[docs]async def getItemStorm(prox, outp=None):
'''Get a Storm CLI instance with prepopulated locs'''
storm = await s_t_storm.StormCli.anit(prox, outp=outp)
storm.echoline = True
return storm
[docs]async def getItemCmdr(prox, outp=None, locs=None):
'''Get a Cmdr instance with prepopulated locs'''
cmdr = await s_cmdr.getItemCmdr(prox, outp=outp)
cmdr.echoline = True
if locs:
cmdr.locs.update(locs)
return cmdr
[docs]@contextlib.contextmanager
def suppress_logging(suppress):
'''
Context manager to suppress specific loggers.
'''
logs = {}
if not suppress:
yield None
else:
try:
for logname in loggers_to_supress:
logger = logging.getLogger(logname)
if logger is not None:
logs[logname] = (logger, logger.level)
logger.setLevel(logger.level + 100)
yield None
finally:
for (logger, level) in logs.values():
logger.setLevel(level)
[docs]class StormCore(s_base.Base):
'''
A helper for jupyter/storm CLI interaction
'''
async def __anit__(self, core, outp=None):
await s_base.Base.__anit__(self)
self.core = core
self.stormcli = await getItemStorm(self.core, outp=outp)
self.onfini(self._onCmdrCoreFini)
self.acm = None # A placeholder for the context manager
async def _onCmdrCoreFini(self):
await self.stormcli.fini()
# await self.core.fini()
# If self.acm is set, acm.__aexit should handle the self.core fini.
if self.acm:
await self.acm.__aexit__(None, None, None)
[docs] @contextlib.contextmanager
def suppress_logging(self, suppress):
'''
Context manager to suppress specific loggers.
'''
with suppress_logging(suppress):
yield None
[docs] async def runCmdLine(self, text, opts=None):
'''
Run a line of text directly via storm cli.
'''
await self.stormcli.runCmdLine(text, opts=opts)
async def _runStorm(self, text, opts=None, cli=False, suppress_logging=False):
mesgs = []
with self.suppress_logging(suppress_logging):
if cli:
def onEvent(event):
mesg = event[1].get('mesg')
mesgs.append(mesg)
with self.stormcli.onWith('storm:mesg', onEvent):
await self.runCmdLine(text, opts=opts)
else:
async for mesg in self.core.storm(text, opts=opts):
mesgs.append(mesg)
return mesgs
[docs] async def storm(self, text, opts=None, num=None, cli=False, suppress_logging=False):
'''
A helper for executing a storm command and getting a list of storm messages.
Args:
text (str): Storm command to execute.
opts (dict): Opt to pass to the cortex during execution.
num (int): Number of nodes to expect in the output query. Checks that with an assert statement.
cli (bool): If True, executes the line via the Storm CLI and will send output to outp.
suppress_logging (bool): If True, suppresses some logging related to Storm runtime exceptions.
Notes:
The opts dictionary will not be used if cmdr=True.
Returns:
list: A list of storm messages.
'''
mesgs = await self._runStorm(text, opts, cli, suppress_logging)
if num is not None:
nodes = [m for m in mesgs if m[0] == 'node']
if len(nodes) != num:
raise AssertionError(f'Expected {num} nodes, got {len(nodes)}')
return mesgs
[docs]class CmdrCore(s_base.Base):
'''
A helper for jupyter/cmdr CLI interaction
'''
async def __anit__(self, core, outp=None):
await s_base.Base.__anit__(self)
self.prefix = 'storm' # Eventually we may remove or change this
self.core = core
locs = {'storm:hide-unknown': True}
self.cmdr = await getItemCmdr(self.core, outp=outp, locs=locs)
self.onfini(self._onCmdrCoreFini)
self.acm = None # A placeholder for the context manager
[docs] async def addFeedData(self, name, items, *, viewiden=None):
'''
Add feed data to the cortex.
'''
return await self.core.addFeedData(name, items, viewiden=viewiden)
[docs] async def runCmdLine(self, text):
'''
Run a line of text directly via cmdr.
'''
await self.cmdr.runCmdLine(text)
[docs] @contextlib.contextmanager
def suppress_logging(self, suppress):
'''
Context manager to suppress specific loggers.
'''
with suppress_logging(suppress):
yield None
async def _runStorm(self, text, opts=None, cmdr=False, suppress_logging=False):
mesgs = []
with self.suppress_logging(suppress_logging):
if cmdr:
if self.prefix:
text = ' '.join((self.prefix, text))
def onEvent(event):
mesg = event[1].get('mesg')
mesgs.append(mesg)
with self.cmdr.onWith('storm:mesg', onEvent):
await self.runCmdLine(text)
else:
async for mesg in self.core.storm(text, opts=opts):
mesgs.append(mesg)
return mesgs
[docs] async def storm(self, text, opts=None, num=None, cmdr=False, suppress_logging=False):
'''
A helper for executing a storm command and getting a list of storm messages.
Args:
text (str): Storm command to execute.
opts (dict): Opt to pass to the cortex during execution.
num (int): Number of nodes to expect in the output query. Checks that with an assert statement.
cmdr (bool): If True, executes the line via the Cmdr CLI and will send output to outp.
suppress_logging (bool): If True, suppresses some logging related to Storm runtime exceptions.
Notes:
The opts dictionary will not be used if cmdr=True.
Returns:
list: A list of storm messages.
'''
mesgs = await self._runStorm(text, opts, cmdr, suppress_logging)
if num is not None:
nodes = [m for m in mesgs if m[0] == 'node']
if len(nodes) != num:
raise AssertionError(f'Expected {num} nodes, got {len(nodes)}')
return mesgs
[docs] async def eval(self, text, opts=None, num=None, cmdr=False):
'''
A helper for executing a storm command and getting a list of packed nodes.
Args:
text (str): Storm command to execute.
opts (dict): Opt to pass to the cortex during execution.
num (int): Number of nodes to expect in the output query. Checks that with an assert statement.
cmdr (bool): If True, executes the line via the Cmdr CLI and will send output to outp.
Notes:
The opts dictionary will not be used if cmdr=True.
Returns:
list: A list of packed nodes.
'''
mesgs = await self._runStorm(text, opts, cmdr)
for mesg in mesgs:
if mesg[0] == 'err': # pragma: no cover
raise AssertionError(f'Query { {text} } got err: {mesg!r}')
nodes = [m[1] for m in mesgs if m[0] == 'node']
if num is not None:
if len(nodes) != num: # pragma: no cover
raise AssertionError(f'Query { {text} } expected {num} nodes, got {len(nodes)}')
return nodes
async def _onCmdrCoreFini(self):
await self.cmdr.fini()
# await self.core.fini()
# If self.acm is set, acm.__aexit should handle the self.core fini.
if self.acm:
await self.acm.__aexit__(None, None, None)
[docs]async def getTempCoreProx(mods=None):
'''
Get a Telepath Proxt to a Cortex instance which is backed by a temporary Cortex.
Args:
mods (list): A list of additional CoreModules to load in the Cortex.
Notes:
The Proxy returned by this should be fini()'d to tear down the temporary Cortex.
Returns:
s_telepath.Proxy
'''
acm = genTempCoreProxy(mods)
prox = await acm.__aenter__()
# Use object.__setattr__ to hulk smash and avoid proxy getattr magick
object.__setattr__(prox, '_acm', acm)
async def onfini():
await prox._acm.__aexit__(None, None, None)
prox.onfini(onfini)
return prox
[docs]async def getTempCoreStorm(mods=None, outp=None):
'''
Get a StormCore instance which is backed by a temporary Cortex.
Args:
mods (list): A list of additional CoreModules to load in the Cortex.
outp: A output helper. Will be used for the Cmdr instance.
Notes:
The StormCore returned by this should be fini()'d to tear down the temporary Cortex.
Returns:
StormCore: A StormCore instance.
'''
acm = genTempCoreProxy(mods)
prox = await acm.__aenter__()
stormcore = await StormCore.anit(prox, outp=outp)
stormcore.acm = acm
return stormcore
[docs]async def getTempCoreCmdr(mods=None, outp=None):
'''
Get a CmdrCore instance which is backed by a temporary Cortex.
Args:
mods (list): A list of additional CoreModules to load in the Cortex.
outp: A output helper. Will be used for the Cmdr instance.
Notes:
The CmdrCore returned by this should be fini()'d to tear down the temporary Cortex.
Returns:
CmdrCore: A CmdrCore instance.
'''
acm = genTempCoreProxy(mods)
prox = await acm.__aenter__()
cmdrcore = await CmdrCore.anit(prox, outp=outp)
cmdrcore.acm = acm
return cmdrcore
[docs]async def getTempCoreCmdrStormsvc(svcname, svcctor, svcconf=None, outp=None):
'''
Get a proxy to a Storm service and a CmdrCore instance backed by a temporary Cortex with the service added.
Args:
svcname (str): Storm service name
svcctor: Storm service constructor (e.g. Example.anit)
svcconf: Optional conf for the Storm service
outp: A output helper for the Cmdr instance
Notes:
Both the CmdrCore and Storm service proxy should be fini()'d for proper teardown
Returns:
(CmdrCore, Proxy): A CmdrCore instance and proxy to the Storm service
'''
cmdrcore = await getTempCoreCmdr(outp=outp)
acm = genTempStormsvcProxy(cmdrcore, svcname, svcctor, svcconf)
svcprox = await acm.__aenter__()
# Use object.__setattr__ to hulk smash and avoid proxy getattr magick
object.__setattr__(svcprox, '_acm', acm)
async def onfini():
await svcprox._acm.__aexit__(None, None, None)
svcprox.onfini(onfini)
return cmdrcore, svcprox
[docs]async def getTempCoreStormStormsvc(svcname, svcctor, svcconf=None, outp=None):
'''
Get a proxy to a Storm service and a StormCore instance backed by a temporary Cortex with the service added.
Args:
svcname (str): Storm service name
svcctor: Storm service constructor (e.g. Example.anit)
svcconf: Optional conf for the Storm service
outp: A output helper for the Cmdr instance
Notes:
Both the StormCore and Storm service proxy should be fini()'d for proper teardown
Returns:
(StormCore, Proxy): A StormCore instance and proxy to the Storm service
'''
stormcore = await getTempCoreStorm(outp=outp)
acm = genTempStormsvcProxy(stormcore, svcname, svcctor, svcconf)
svcprox = await acm.__aenter__()
# Use object.__setattr__ to hulk smash and avoid proxy getattr magick
object.__setattr__(svcprox, '_acm', acm)
async def onfini():
await svcprox._acm.__aexit__(None, None, None)
svcprox.onfini(onfini)
return stormcore, svcprox