import os
import sys
import copy
import shlex
import pprint
import logging
import argparse
import contextlib
import subprocess
import collections
import vcr
import regex
from unittest import mock
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.base as s_base
import synapse.lib.json as s_json
import synapse.lib.output as s_output
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.stormhttp as s_stormhttp
import synapse.cmds.cortex as s_cmds_cortex
import synapse.tools.storm as s_storm
import synapse.tools.genpkg as s_genpkg
re_directive = regex.compile(r'^\.\.\s(shell.*|storm.*|[^:])::(?:\s(.*)$|$)')
logger = logging.getLogger(__name__)
ONLOAD_TIMEOUT = int(os.getenv('SYNDEV_PKG_LOAD_TIMEOUT', 30)) # seconds
[docs]
class OutPutRst(s_output.OutPutStr):
'''
Rst specific helper for output intended to be indented
in RST text as a literal block.
'''
prefix = ' '
[docs]
def printf(self, mesg, addnl=True):
if '\n' in mesg:
logger.debug(f'Newline found in [{mesg}]')
parts = mesg.split('\n')
mesg0 = '\n'.join([self.prefix + part for part in parts[1:]])
mesg = '\n'.join((parts[0], mesg0))
return s_output.OutPutStr.printf(self, mesg, addnl)
[docs]
class StormOutput(s_cmds_cortex.StormCmd):
'''
Produce standard output from a stream of storm runtime messages.
Must be instantiated for a single query with a rstorm context.
'''
_cmd_syntax = (
('--hide-query', {}),
) + s_cmds_cortex.StormCmd._cmd_syntax
def __init__(self, core, ctx, stormopts=None, opts=None):
if opts is None:
opts = {}
s_cmds_cortex.StormCmd.__init__(self, None, **opts)
self.stormopts = stormopts or {}
# hide a few mesg types by default
for mtype in ('init', 'fini', 'node:edits', 'node:edits:count', ):
self.cmdmeths[mtype] = self._silence
self.core = core
self.ctx = ctx
self.lines = []
self.prefix = ' '
[docs]
async def runCmdLine(self, line):
opts = self.getCmdOpts(f'storm {line}')
return await self.runCmdOpts(opts)
def _printNodeProp(self, name, valu):
base = f' {name} = '
if '\n' in valu:
parts = collections.deque(valu.split('\n'))
ws = ' ' * len(base)
self.printf(f'{base}{parts.popleft()}')
while parts:
part = parts.popleft()
self.printf(f'{ws}{part}')
else:
self.printf(f'{base}{valu}')
async def _mockHttp(self, *args, **kwargs):
info = {
'code': 200,
'body': '{}',
}
resp = self.ctx.get('mock-http')
if resp:
body = resp.get('body')
if isinstance(body, (dict, list)):
body = s_json.dumps(body)
elif isinstance(body, str):
body = body.encode()
info = {
'code': resp.get('code', 200),
'body': body,
}
return s_stormhttp.HttpResp(info)
@contextlib.contextmanager
def _shimHttpCalls(self, vcr_kwargs):
path = self.ctx.get('mock-http-path')
if not vcr_kwargs:
vcr_kwargs = {}
if path:
path = os.path.abspath(path)
# try it as json first (since yaml can load json...). if it parses, we're old school
# if it doesn't, either it doesn't exist/we can't read it/we can't parse it.
# in any of those cases, default to using vcr
try:
with open(path, 'r') as fd:
byts = s_json.load(fd)
except (FileNotFoundError, s_exc.BadJsonText):
byts = None
if not byts:
recorder = vcr.VCR(**vcr_kwargs)
vcrcb = self.ctx.get('storm-vcr-callback', None)
if vcrcb:
vcrcb(recorder)
with recorder.use_cassette(os.path.abspath(path)) as cass:
yield cass
self.ctx.pop('mock-http-path', None)
else: # backwards compat
if not os.path.isfile(path):
raise s_exc.NoSuchFile(mesg='Storm HTTP mock filepath does not exist', path=path)
self.ctx['mock-http'] = byts
with mock.patch('synapse.lib.stormhttp.LibHttp._httpRequest', new=self._mockHttp):
yield
else:
yield
[docs]
def printf(self, mesg, addnl=True, color=None):
line = f'{self.prefix}{mesg}'
if '\n' in line:
logger.debug(f'Newline found in [{mesg}]')
parts = line.split('\n')
mesg0 = '\n'.join([self.prefix + part for part in parts[1:]])
line = '\n'.join((parts[0], mesg0))
self.lines.append(line)
return line
def _silence(self, mesg, opts):
pass
def _onErr(self, mesg, opts):
# raise on err for rst
if self.ctx.pop('storm-fail', None):
s_cmds_cortex.StormCmd._onErr(self, mesg, opts)
return
(errname, errinfo) = mesg[1]
errinfo.setdefault('_errname', errname)
raise s_exc.StormRuntimeError(**errinfo)
[docs]
async def runCmdOpts(self, opts):
text = opts.get('query')
if not opts.get('hide-query'):
self.printf(f'> {text}')
stormopts = copy.deepcopy(self.stormopts)
stormopts.setdefault('repr', True)
stormopts.setdefault('path', opts.get('path', False))
hide_unknown = True
# Let this raise on any errors
with self._shimHttpCalls(self.ctx.get('storm-vcr-opts')):
async for mesg in self.core.storm(text, opts=stormopts):
if opts.get('debug'):
self.printf(pprint.pformat(mesg))
continue
try:
func = self.cmdmeths[mesg[0]]
except KeyError: # pragma: no cover
if hide_unknown:
continue
self.printf(repr(mesg), color=s_cmds_cortex.UNKNOWN_COLOR)
else:
func(mesg, opts)
return '\n'.join(self.lines)
[docs]
class StormCliOutput(s_storm.StormCli):
async def __anit__(self, item, outp=s_output.stdout, opts=None):
await s_storm.StormCli.__anit__(self, item, outp, opts)
self.ctx = {}
self._print_skips.append('init')
self._print_skips.append('fini')
self._print_skips.append('prov:new') # TODO: Remove in v3.0.0
self._print_skips.append('node:edits')
self._print_skips.append('node:edits:count')
[docs]
def printf(self, mesg, addnl=True, color=None):
mesg = f' {mesg}'
s_storm.StormCli.printf(self, mesg, addnl, color)
[docs]
async def handleErr(self, mesg):
# raise on err for rst
if self.ctx.pop('storm-fail', None):
await s_storm.StormCli.handleErr(self, mesg)
return
raise s_exc.StormRuntimeError(mesg=mesg)
def _printNodeProp(self, name, valu):
base = f' {name} = '
if '\n' in valu:
parts = collections.deque(valu.split('\n'))
ws = ' ' * len(base)
self.printf(f'{base}{parts.popleft()}')
while parts:
part = parts.popleft()
self.printf(f'{ws}{part}')
else:
self.printf(f'{base}{valu}')
async def _mockHttp(self, *args, **kwargs):
info = {
'code': 200,
'body': '{}',
}
resp = self.ctx.get('mock-http')
if resp:
body = resp.get('body')
if isinstance(body, (dict, list)):
body = s_json.dumps(body)
elif isinstance(body, str):
body = body.encode()
info = {
'code': resp.get('code', 200),
'body': body,
}
return s_stormhttp.HttpResp(info)
@contextlib.contextmanager
def _shimHttpCalls(self, vcr_kwargs):
path = self.ctx.get('mock-http-path')
if not vcr_kwargs:
vcr_kwargs = {}
if path:
path = os.path.abspath(path)
# try it as json first (since yaml can load json...). if it parses, we're old school
# if it doesn't, either it doesn't exist/we can't read it/we can't parse it.
# in any of those cases, default to using vcr
try:
with open(path, 'r') as fd:
byts = s_json.load(fd)
except (FileNotFoundError, s_exc.BadJsonText):
byts = None
if not byts:
with vcr.use_cassette(os.path.abspath(path), **vcr_kwargs) as cass:
yield cass
self.ctx.pop('mock-http-path', None)
else: # backwards compat
if not os.path.isfile(path):
raise s_exc.NoSuchFile(mesg='Storm HTTP mock filepath does not exist', path=path)
self.ctx['mock-http'] = byts
with mock.patch('synapse.lib.stormhttp.LibHttp._httpRequest', new=self._mockHttp):
yield
else:
yield
[docs]
async def runRstCmdLine(self, text, ctx, stormopts=None):
self.ctx = ctx
self.printf(self.cmdprompt + text)
with self._shimHttpCalls(self.ctx.get('storm-vcr-opts')):
await self.runCmdLine(text, opts=stormopts)
return str(self.outp)
[docs]
@contextlib.asynccontextmanager
async def getCell(ctor, conf):
loc = s_dyndeps.getDynLocal(ctor)
if loc is None:
raise s_exc.NoSuchCtor(mesg=f'Unable to resolve ctor [{ctor}]', ctor=ctor)
with s_common.getTempDir() as dirn:
async with await loc.anit(dirn, conf=conf) as cell:
yield cell
[docs]
class StormRst(s_base.Base):
async def __anit__(self, rstfile):
await s_base.Base.__anit__(self)
self.rstfile = s_common.genpath(rstfile)
if not os.path.isfile(rstfile):
raise s_exc.BadConfValu(mesg='A valid rstfile must be specified', rstfile=self.rstfile)
self.linesout = []
self.context = {}
self.stormvars = {}
self.core = None
self.handlers = {
'shell': self._handleShell,
'shell-env': self._handleShellEnv,
'storm': self._handleStorm,
'storm-cli': self._handleStormCli,
'storm-pkg': self._handleStormPkg,
'storm-pre': self._handleStormPre,
'storm-svc': self._handleStormSvc,
'storm-fail': self._handleStormFail,
'storm-opts': self._handleStormOpts,
'storm-cortex': self._handleStormCortex,
'storm-envvar': self._handleStormEnvVar,
'storm-expect': self._handleStormExpect,
'storm-python-path': self._handlePythonPath,
'storm-multiline': self._handleStormMultiline,
'storm-mock-http': self._handleStormMockHttp,
'storm-vcr-opts': self._handleStormVcrOpts,
'storm-clear-http': self._handleStormClearHttp,
'storm-vcr-callback': self._handleStormVcrCallback,
}
async def _getCell(self, ctor, conf=None):
if conf is None:
conf = {}
cell = await self.enter_context(getCell(ctor, conf))
return cell
def _printf(self, line):
self.linesout.append(line)
def _reqCore(self):
if self.core is None:
mesg = 'No cortex set. Use .. storm-cortex::'
raise s_exc.NoSuchVar(mesg=mesg)
return self.core
def _getHandler(self, directive):
handler = self.handlers.get(directive)
if handler is None:
raise s_exc.NoSuchName(mesg=f'The {directive} directive is not supported', directive=directive)
return handler
async def _handlePythonPath(self, text):
'''
Add the text to sys.path.
Args:
text: The path to add.
'''
if not os.path.isdir(text):
raise s_exc.NoSuchDir(mesg=f'The path {text} is not a directory', path=text)
if text not in sys.path:
logger.debug(f'Inserting {text} into sys.path')
sys.path.insert(0, text)
def onfini():
sys.path.remove(text)
self.onfini(onfini)
async def _handleStorm(self, text):
'''
Run a Storm command and generate text from the output.
Args:
text (str): A valid Storm query.
'''
core = self._reqCore()
text = self._getStormMultiline(text)
self._printf('::\n')
self._printf('\n')
soutp = StormOutput(core, self.context, stormopts=self.context.get('storm-opts'))
self._printf(await soutp.runCmdLine(text))
if self.context.pop('storm-fail', None):
raise s_exc.StormRuntimeError(mesg='Expected a failure, but none occurred.')
self._printf('\n\n')
async def _handleStormCli(self, text):
core = self._reqCore()
outp = OutPutRst()
text = self._getStormMultiline(text)
self._printf('::\n')
self._printf('\n')
cli = await StormCliOutput.anit(item=core, outp=outp)
self._printf(await cli.runRstCmdLine(text, self.context, stormopts=self.context.get('storm-opts')))
if self.context.pop('storm-fail', None):
raise s_exc.StormRuntimeError(mesg='Expected a failure, but none occurred.')
self._printf('\n')
async def _handleStormPkg(self, text):
'''
Load a Storm package into the Cortex by path.
Args:
text (str): The path to a Storm package YAML file.
'''
if not os.path.isfile(text):
raise s_exc.NoSuchFile(mesg='Storm Package filepath does not exist', path=text)
core = self._reqCore()
pkg = s_genpkg.loadPkgProto(text)
if pkg.get('onload') is not None:
waiter = core.waiter(1, 'core:pkg:onload:complete')
else:
waiter = None
await core.addStormPkg(pkg)
if waiter is not None and not await waiter.wait(timeout=ONLOAD_TIMEOUT):
raise s_exc.SynErr(mesg=f'Package onload failed to run for {pkg.get("name")}')
async def _handleStormPre(self, text):
'''
Run a Storm query to prepare the Cortex without output.
Args:
text (str): A valid Storm query
'''
core = self._reqCore()
self.context.setdefault('storm-opts', {})
stormopts = self.context.get('storm-opts')
stormopts.setdefault('vars', {})
# only map env vars in for storm-pre
stormopts = copy.deepcopy(stormopts)
stormopts['vars'].update(self.stormvars)
soutp = StormOutput(core, self.context, stormopts=stormopts)
await soutp.runCmdLine(text)
self.context.pop('storm-fail', None)
async def _handleStormSvc(self, text):
'''
Load a Storm service by ctor and add to the Cortex.
Args:
text (str): <ctor> <svcname> <optional JSON string to use as svcconf>
'''
core = self._reqCore()
splts = text.split(' ', 2)
ctor, svcname = splts[:2]
svcconf = s_json.loads(splts[2].strip()) if len(splts) == 3 else {}
svc = await self._getCell(ctor, conf=svcconf)
onloadcnt = len([p for p in svc.cellapi._storm_svc_pkgs if p.get('onload') is not None])
waiter = core.waiter(onloadcnt, 'core:pkg:onload:complete') if onloadcnt else None
svc.dmon.share('svc', svc)
root = await svc.auth.getUserByName('root')
await root.setPasswd('root')
info = await svc.dmon.listen('tcp://127.0.0.1:0/')
svc.dmon.test_addr = info
host, port = info
surl = f'tcp://root:[email protected]:{port}/svc'
await core.nodes(f'service.add {svcname} {surl}')
await core.nodes(f'$lib.service.wait({svcname})')
if waiter is not None and not await waiter.wait(timeout=ONLOAD_TIMEOUT):
raise s_exc.SynErr(mesg=f'Package onload failed to run for service {svcname}')
async def _handleStormFail(self, text):
valu = s_json.loads(text)
assert valu in (True, False), f'storm-fail must be a boolean: {text}'
self.context['storm-fail'] = valu
def _getStormMultiline(self, text):
if '=' in text:
sentinel, key = text.split('=', 1)
if sentinel != 'MULTILINE':
return text
ret = self.context.get('multiline', {}).get(key)
assert ret is not None, f'Invalid multiline text: {text}'
return ret
return text
async def _handleStormMultiline(self, text):
key, valu = text.split('=', 1)
assert key.isupper()
valu = s_json.loads(valu)
assert isinstance(valu, str)
multi = self.context.get('multiline', {})
multi[key] = valu
self.context['multiline'] = multi
async def _handleStormOpts(self, text):
'''
Opts to use in subsequent Storm queries.
Args:
text (str): JSON string, e.g. {"vars": {"foo": "bar"}}
'''
item = s_json.loads(text)
self.context['storm-opts'] = item
async def _handleStormClearHttp(self, text):
'''
Reset the storm http context and any associated opts with it.
Args:
text (str): true if you also want to clear any storm/vcr opts as well
'''
if text == 'true':
self.context.pop('storm-opts', None)
self.context.pop('storm-vcr-opts', None)
self.context.pop('storm-vcr-callback', None)
self.context.pop('mock-http-path', None)
self.context.pop('mock-http', None)
async def _handleStormCortex(self, text):
'''
Spin up a default Cortex if ctor=default, else load the defined ctor.
TODO: Handle providing conf in text
Args:
text (str): "default" or a ctor (e.g. synapse.cortex.Cortex)
'''
if self.core is not None:
await self.core.fini()
self.core = None
ctor = 'synapse.cortex.Cortex' if text == 'default' else text
self.core = await self._getCell(ctor)
async def _handleStormEnvVar(self, text):
name, valu = text.split('=', 1)
name = name.strip()
valu = valu.strip()
self.stormvars[name] = os.getenv(name, valu)
async def _handleStormExpect(self, text):
# TODO handle some light weight output confirmation.
return
async def _handleStormMockHttp(self, text):
'''
Setup an HTTP mock file to be used with a later Storm command.
Response file format:
{
"code": int,
"body": {
"data": json or a json str
}
}
Args:
text (str): Path to a json file with the response.
'''
self.context['mock-http-path'] = text
async def _handleStormVcrOpts(self, text):
'''
Opts to pass to VCRPY for use in generating docs
Args:
text (str): JSON string, e.g. {"filter_query_args": true}
'''
item = s_json.loads(text)
self.context['storm-vcr-opts'] = item
async def _handleStormVcrCallback(self, text):
'''
Get a callback function as a dynlocal
'''
cb = s_dyndeps.getDynLocal(text)
if cb is None:
raise s_exc.NoSuchCtor(mesg=f'Failed to get callback "{text}"', ctor=text)
self.context['storm-vcr-callback'] = cb
async def _handleShell(self, text):
'''
Execute shell with the supplied arguments.
'''
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--include-stderr', action='store_true', help='Include stderr in output.')
parser.add_argument('--hide-query', action='store_true', help='Do not include the command in the output.')
parser.add_argument('--fail-ok', action='store_true', help='Non-zero return values are non-fatal.')
opts, args = parser.parse_known_args(shlex.split(text))
# Remove any command line arguments
query = text
query = query.replace('--include-stderr', '')
query = query.replace('--hide-query', '')
query = query.replace('--fail-ok', '')
query = query.strip()
env = dict(os.environ)
env.update(self.context.get('shell-env', {}))
stderr = None
if opts.include_stderr:
stderr = subprocess.STDOUT
proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=stderr, env=env, text=True)
if proc.returncode != 0 and not opts.fail_ok:
mesg = f'Error when executing shell directive: {text} (rv: {proc.returncode})'
raise s_exc.SynErr(mesg=mesg)
self._printf('::\n\n')
if not opts.hide_query:
self._printf(f' {query}\n\n')
for line in proc.stdout.splitlines():
self._printf(f' {line}\n')
self._printf('\n\n')
async def _handleShellEnv(self, text):
'''
Env to use in subsequent shell queries.
Args:
text (str): [KEY=VALUE ...]
Note: No arguments will reset the shell environment.
'''
text = text.strip()
if not text:
return self.context.pop('shell-env')
env = {}
for item in text.split(' '):
key, val = item.split('=')
env[key] = val
self.context['shell-env'] = env
async def _readline(self, line):
match = re_directive.match(line)
if match is not None:
directive, text = match.groups()
text = text.strip()
handler = self._getHandler(directive)
logger.debug(f'Executing {directive} -> {text}')
await handler(text)
return
self._printf(line)
[docs]
async def run(self):
'''
Parses the specified RST file with Storm directive handling.
Returns:
list: List of line strings for the RST output
'''
with open(self.rstfile, 'r') as fd:
lines = fd.readlines()
for line in lines:
await self._readline(line)
return self.linesout