Source code for synapse.lib.rstorm

import os
import sys
import copy
import shlex
import pprint
import logging
import argparse
import contextlib
import subprocess
import collections

import vcr
import regex

from unittest import mock

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.base as s_base
import synapse.lib.json as s_json
import synapse.lib.output as s_output
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.stormhttp as s_stormhttp

import synapse.cmds.cortex as s_cmds_cortex

import synapse.tools.storm as s_storm
import synapse.tools.genpkg as s_genpkg


re_directive = regex.compile(r'^\.\.\s(shell.*|storm.*|[^:])::(?:\s(.*)$|$)')

logger = logging.getLogger(__name__)

ONLOAD_TIMEOUT = int(os.getenv('SYNDEV_PKG_LOAD_TIMEOUT', 30))  # seconds

[docs] class OutPutRst(s_output.OutPutStr): ''' Rst specific helper for output intended to be indented in RST text as a literal block. ''' prefix = ' '
[docs] def printf(self, mesg, addnl=True): if '\n' in mesg: logger.debug(f'Newline found in [{mesg}]') parts = mesg.split('\n') mesg0 = '\n'.join([self.prefix + part for part in parts[1:]]) mesg = '\n'.join((parts[0], mesg0)) return s_output.OutPutStr.printf(self, mesg, addnl)
[docs] class StormOutput(s_cmds_cortex.StormCmd): ''' Produce standard output from a stream of storm runtime messages. Must be instantiated for a single query with a rstorm context. ''' _cmd_syntax = ( ('--hide-query', {}), ) + s_cmds_cortex.StormCmd._cmd_syntax def __init__(self, core, ctx, stormopts=None, opts=None): if opts is None: opts = {} s_cmds_cortex.StormCmd.__init__(self, None, **opts) self.stormopts = stormopts or {} # hide a few mesg types by default for mtype in ('init', 'fini', 'node:edits', 'node:edits:count', ): self.cmdmeths[mtype] = self._silence self.core = core self.ctx = ctx self.lines = [] self.prefix = ' '
[docs] async def runCmdLine(self, line): opts = self.getCmdOpts(f'storm {line}') return await self.runCmdOpts(opts)
def _printNodeProp(self, name, valu): base = f' {name} = ' if '\n' in valu: parts = collections.deque(valu.split('\n')) ws = ' ' * len(base) self.printf(f'{base}{parts.popleft()}') while parts: part = parts.popleft() self.printf(f'{ws}{part}') else: self.printf(f'{base}{valu}') async def _mockHttp(self, *args, **kwargs): info = { 'code': 200, 'body': '{}', } resp = self.ctx.get('mock-http') if resp: body = resp.get('body') if isinstance(body, (dict, list)): body = s_json.dumps(body) elif isinstance(body, str): body = body.encode() info = { 'code': resp.get('code', 200), 'body': body, } return s_stormhttp.HttpResp(info) @contextlib.contextmanager def _shimHttpCalls(self, vcr_kwargs): path = self.ctx.get('mock-http-path') if not vcr_kwargs: vcr_kwargs = {} if path: path = os.path.abspath(path) # try it as json first (since yaml can load json...). if it parses, we're old school # if it doesn't, either it doesn't exist/we can't read it/we can't parse it. # in any of those cases, default to using vcr try: with open(path, 'r') as fd: byts = s_json.load(fd) except (FileNotFoundError, s_exc.BadJsonText): byts = None if not byts: recorder = vcr.VCR(**vcr_kwargs) vcrcb = self.ctx.get('storm-vcr-callback', None) if vcrcb: vcrcb(recorder) with recorder.use_cassette(os.path.abspath(path)) as cass: yield cass self.ctx.pop('mock-http-path', None) else: # backwards compat if not os.path.isfile(path): raise s_exc.NoSuchFile(mesg='Storm HTTP mock filepath does not exist', path=path) self.ctx['mock-http'] = byts with mock.patch('synapse.lib.stormhttp.LibHttp._httpRequest', new=self._mockHttp): yield else: yield
[docs] def printf(self, mesg, addnl=True, color=None): line = f'{self.prefix}{mesg}' if '\n' in line: logger.debug(f'Newline found in [{mesg}]') parts = line.split('\n') mesg0 = '\n'.join([self.prefix + part for part in parts[1:]]) line = '\n'.join((parts[0], mesg0)) self.lines.append(line) return line
def _silence(self, mesg, opts): pass def _onErr(self, mesg, opts): # raise on err for rst if self.ctx.pop('storm-fail', None): s_cmds_cortex.StormCmd._onErr(self, mesg, opts) return (errname, errinfo) = mesg[1] errinfo.setdefault('_errname', errname) raise s_exc.StormRuntimeError(**errinfo)
[docs] async def runCmdOpts(self, opts): text = opts.get('query') if not opts.get('hide-query'): self.printf(f'> {text}') stormopts = copy.deepcopy(self.stormopts) stormopts.setdefault('repr', True) stormopts.setdefault('path', opts.get('path', False)) hide_unknown = True # Let this raise on any errors with self._shimHttpCalls(self.ctx.get('storm-vcr-opts')): async for mesg in self.core.storm(text, opts=stormopts): if opts.get('debug'): self.printf(pprint.pformat(mesg)) continue try: func = self.cmdmeths[mesg[0]] except KeyError: # pragma: no cover if hide_unknown: continue self.printf(repr(mesg), color=s_cmds_cortex.UNKNOWN_COLOR) else: func(mesg, opts) return '\n'.join(self.lines)
[docs] class StormCliOutput(s_storm.StormCli): async def __anit__(self, item, outp=s_output.stdout, opts=None): await s_storm.StormCli.__anit__(self, item, outp, opts) self.ctx = {} self._print_skips.append('init') self._print_skips.append('fini') self._print_skips.append('prov:new') # TODO: Remove in v3.0.0 self._print_skips.append('node:edits') self._print_skips.append('node:edits:count')
[docs] def printf(self, mesg, addnl=True, color=None): mesg = f' {mesg}' s_storm.StormCli.printf(self, mesg, addnl, color)
[docs] async def handleErr(self, mesg): # raise on err for rst if self.ctx.pop('storm-fail', None): await s_storm.StormCli.handleErr(self, mesg) return raise s_exc.StormRuntimeError(mesg=mesg)
def _printNodeProp(self, name, valu): base = f' {name} = ' if '\n' in valu: parts = collections.deque(valu.split('\n')) ws = ' ' * len(base) self.printf(f'{base}{parts.popleft()}') while parts: part = parts.popleft() self.printf(f'{ws}{part}') else: self.printf(f'{base}{valu}') async def _mockHttp(self, *args, **kwargs): info = { 'code': 200, 'body': '{}', } resp = self.ctx.get('mock-http') if resp: body = resp.get('body') if isinstance(body, (dict, list)): body = s_json.dumps(body) elif isinstance(body, str): body = body.encode() info = { 'code': resp.get('code', 200), 'body': body, } return s_stormhttp.HttpResp(info) @contextlib.contextmanager def _shimHttpCalls(self, vcr_kwargs): path = self.ctx.get('mock-http-path') if not vcr_kwargs: vcr_kwargs = {} if path: path = os.path.abspath(path) # try it as json first (since yaml can load json...). if it parses, we're old school # if it doesn't, either it doesn't exist/we can't read it/we can't parse it. # in any of those cases, default to using vcr try: with open(path, 'r') as fd: byts = s_json.load(fd) except (FileNotFoundError, s_exc.BadJsonText): byts = None if not byts: with vcr.use_cassette(os.path.abspath(path), **vcr_kwargs) as cass: yield cass self.ctx.pop('mock-http-path', None) else: # backwards compat if not os.path.isfile(path): raise s_exc.NoSuchFile(mesg='Storm HTTP mock filepath does not exist', path=path) self.ctx['mock-http'] = byts with mock.patch('synapse.lib.stormhttp.LibHttp._httpRequest', new=self._mockHttp): yield else: yield
[docs] async def runRstCmdLine(self, text, ctx, stormopts=None): self.ctx = ctx self.printf(self.cmdprompt + text) with self._shimHttpCalls(self.ctx.get('storm-vcr-opts')): await self.runCmdLine(text, opts=stormopts) return str(self.outp)
[docs] @contextlib.asynccontextmanager async def getCell(ctor, conf): loc = s_dyndeps.getDynLocal(ctor) if loc is None: raise s_exc.NoSuchCtor(mesg=f'Unable to resolve ctor [{ctor}]', ctor=ctor) with s_common.getTempDir() as dirn: async with await loc.anit(dirn, conf=conf) as cell: yield cell
[docs] class StormRst(s_base.Base): async def __anit__(self, rstfile): await s_base.Base.__anit__(self) self.rstfile = s_common.genpath(rstfile) if not os.path.isfile(rstfile): raise s_exc.BadConfValu(mesg='A valid rstfile must be specified', rstfile=self.rstfile) self.linesout = [] self.context = {} self.stormvars = {} self.core = None self.handlers = { 'shell': self._handleShell, 'shell-env': self._handleShellEnv, 'storm': self._handleStorm, 'storm-cli': self._handleStormCli, 'storm-pkg': self._handleStormPkg, 'storm-pre': self._handleStormPre, 'storm-svc': self._handleStormSvc, 'storm-fail': self._handleStormFail, 'storm-opts': self._handleStormOpts, 'storm-cortex': self._handleStormCortex, 'storm-envvar': self._handleStormEnvVar, 'storm-expect': self._handleStormExpect, 'storm-python-path': self._handlePythonPath, 'storm-multiline': self._handleStormMultiline, 'storm-mock-http': self._handleStormMockHttp, 'storm-vcr-opts': self._handleStormVcrOpts, 'storm-clear-http': self._handleStormClearHttp, 'storm-vcr-callback': self._handleStormVcrCallback, } async def _getCell(self, ctor, conf=None): if conf is None: conf = {} cell = await self.enter_context(getCell(ctor, conf)) return cell def _printf(self, line): self.linesout.append(line) def _reqCore(self): if self.core is None: mesg = 'No cortex set. Use .. storm-cortex::' raise s_exc.NoSuchVar(mesg=mesg) return self.core def _getHandler(self, directive): handler = self.handlers.get(directive) if handler is None: raise s_exc.NoSuchName(mesg=f'The {directive} directive is not supported', directive=directive) return handler async def _handlePythonPath(self, text): ''' Add the text to sys.path. Args: text: The path to add. ''' if not os.path.isdir(text): raise s_exc.NoSuchDir(mesg=f'The path {text} is not a directory', path=text) if text not in sys.path: logger.debug(f'Inserting {text} into sys.path') sys.path.insert(0, text) def onfini(): sys.path.remove(text) self.onfini(onfini) async def _handleStorm(self, text): ''' Run a Storm command and generate text from the output. Args: text (str): A valid Storm query. ''' core = self._reqCore() text = self._getStormMultiline(text) self._printf('::\n') self._printf('\n') soutp = StormOutput(core, self.context, stormopts=self.context.get('storm-opts')) self._printf(await soutp.runCmdLine(text)) if self.context.pop('storm-fail', None): raise s_exc.StormRuntimeError(mesg='Expected a failure, but none occurred.') self._printf('\n\n') async def _handleStormCli(self, text): core = self._reqCore() outp = OutPutRst() text = self._getStormMultiline(text) self._printf('::\n') self._printf('\n') cli = await StormCliOutput.anit(item=core, outp=outp) self._printf(await cli.runRstCmdLine(text, self.context, stormopts=self.context.get('storm-opts'))) if self.context.pop('storm-fail', None): raise s_exc.StormRuntimeError(mesg='Expected a failure, but none occurred.') self._printf('\n') async def _handleStormPkg(self, text): ''' Load a Storm package into the Cortex by path. Args: text (str): The path to a Storm package YAML file. ''' if not os.path.isfile(text): raise s_exc.NoSuchFile(mesg='Storm Package filepath does not exist', path=text) core = self._reqCore() pkg = s_genpkg.loadPkgProto(text) if pkg.get('onload') is not None: waiter = core.waiter(1, 'core:pkg:onload:complete') else: waiter = None await core.addStormPkg(pkg) if waiter is not None and not await waiter.wait(timeout=ONLOAD_TIMEOUT): raise s_exc.SynErr(mesg=f'Package onload failed to run for {pkg.get("name")}') async def _handleStormPre(self, text): ''' Run a Storm query to prepare the Cortex without output. Args: text (str): A valid Storm query ''' core = self._reqCore() self.context.setdefault('storm-opts', {}) stormopts = self.context.get('storm-opts') stormopts.setdefault('vars', {}) # only map env vars in for storm-pre stormopts = copy.deepcopy(stormopts) stormopts['vars'].update(self.stormvars) soutp = StormOutput(core, self.context, stormopts=stormopts) await soutp.runCmdLine(text) self.context.pop('storm-fail', None) async def _handleStormSvc(self, text): ''' Load a Storm service by ctor and add to the Cortex. Args: text (str): <ctor> <svcname> <optional JSON string to use as svcconf> ''' core = self._reqCore() splts = text.split(' ', 2) ctor, svcname = splts[:2] svcconf = s_json.loads(splts[2].strip()) if len(splts) == 3 else {} svc = await self._getCell(ctor, conf=svcconf) onloadcnt = len([p for p in svc.cellapi._storm_svc_pkgs if p.get('onload') is not None]) waiter = core.waiter(onloadcnt, 'core:pkg:onload:complete') if onloadcnt else None svc.dmon.share('svc', svc) root = await svc.auth.getUserByName('root') await root.setPasswd('root') info = await svc.dmon.listen('tcp://127.0.0.1:0/') svc.dmon.test_addr = info host, port = info surl = f'tcp://root:[email protected]:{port}/svc' await core.nodes(f'service.add {svcname} {surl}') await core.nodes(f'$lib.service.wait({svcname})') if waiter is not None and not await waiter.wait(timeout=ONLOAD_TIMEOUT): raise s_exc.SynErr(mesg=f'Package onload failed to run for service {svcname}') async def _handleStormFail(self, text): valu = s_json.loads(text) assert valu in (True, False), f'storm-fail must be a boolean: {text}' self.context['storm-fail'] = valu def _getStormMultiline(self, text): if '=' in text: sentinel, key = text.split('=', 1) if sentinel != 'MULTILINE': return text ret = self.context.get('multiline', {}).get(key) assert ret is not None, f'Invalid multiline text: {text}' return ret return text async def _handleStormMultiline(self, text): key, valu = text.split('=', 1) assert key.isupper() valu = s_json.loads(valu) assert isinstance(valu, str) multi = self.context.get('multiline', {}) multi[key] = valu self.context['multiline'] = multi async def _handleStormOpts(self, text): ''' Opts to use in subsequent Storm queries. Args: text (str): JSON string, e.g. {"vars": {"foo": "bar"}} ''' item = s_json.loads(text) self.context['storm-opts'] = item async def _handleStormClearHttp(self, text): ''' Reset the storm http context and any associated opts with it. Args: text (str): true if you also want to clear any storm/vcr opts as well ''' if text == 'true': self.context.pop('storm-opts', None) self.context.pop('storm-vcr-opts', None) self.context.pop('storm-vcr-callback', None) self.context.pop('mock-http-path', None) self.context.pop('mock-http', None) async def _handleStormCortex(self, text): ''' Spin up a default Cortex if ctor=default, else load the defined ctor. TODO: Handle providing conf in text Args: text (str): "default" or a ctor (e.g. synapse.cortex.Cortex) ''' if self.core is not None: await self.core.fini() self.core = None ctor = 'synapse.cortex.Cortex' if text == 'default' else text self.core = await self._getCell(ctor) async def _handleStormEnvVar(self, text): name, valu = text.split('=', 1) name = name.strip() valu = valu.strip() self.stormvars[name] = os.getenv(name, valu) async def _handleStormExpect(self, text): # TODO handle some light weight output confirmation. return async def _handleStormMockHttp(self, text): ''' Setup an HTTP mock file to be used with a later Storm command. Response file format: { "code": int, "body": { "data": json or a json str } } Args: text (str): Path to a json file with the response. ''' self.context['mock-http-path'] = text async def _handleStormVcrOpts(self, text): ''' Opts to pass to VCRPY for use in generating docs Args: text (str): JSON string, e.g. {"filter_query_args": true} ''' item = s_json.loads(text) self.context['storm-vcr-opts'] = item async def _handleStormVcrCallback(self, text): ''' Get a callback function as a dynlocal ''' cb = s_dyndeps.getDynLocal(text) if cb is None: raise s_exc.NoSuchCtor(mesg=f'Failed to get callback "{text}"', ctor=text) self.context['storm-vcr-callback'] = cb async def _handleShell(self, text): ''' Execute shell with the supplied arguments. ''' parser = argparse.ArgumentParser(add_help=False) parser.add_argument('--include-stderr', action='store_true', help='Include stderr in output.') parser.add_argument('--hide-query', action='store_true', help='Do not include the command in the output.') parser.add_argument('--fail-ok', action='store_true', help='Non-zero return values are non-fatal.') opts, args = parser.parse_known_args(shlex.split(text)) # Remove any command line arguments query = text query = query.replace('--include-stderr', '') query = query.replace('--hide-query', '') query = query.replace('--fail-ok', '') query = query.strip() env = dict(os.environ) env.update(self.context.get('shell-env', {})) stderr = None if opts.include_stderr: stderr = subprocess.STDOUT proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=stderr, env=env, text=True) if proc.returncode != 0 and not opts.fail_ok: mesg = f'Error when executing shell directive: {text} (rv: {proc.returncode})' raise s_exc.SynErr(mesg=mesg) self._printf('::\n\n') if not opts.hide_query: self._printf(f' {query}\n\n') for line in proc.stdout.splitlines(): self._printf(f' {line}\n') self._printf('\n\n') async def _handleShellEnv(self, text): ''' Env to use in subsequent shell queries. Args: text (str): [KEY=VALUE ...] Note: No arguments will reset the shell environment. ''' text = text.strip() if not text: return self.context.pop('shell-env') env = {} for item in text.split(' '): key, val = item.split('=') env[key] = val self.context['shell-env'] = env async def _readline(self, line): match = re_directive.match(line) if match is not None: directive, text = match.groups() text = text.strip() handler = self._getHandler(directive) logger.debug(f'Executing {directive} -> {text}') await handler(text) return self._printf(line)
[docs] async def run(self): ''' Parses the specified RST file with Storm directive handling. Returns: list: List of line strings for the RST output ''' with open(self.rstfile, 'r') as fd: lines = fd.readlines() for line in lines: await self._readline(line) return self.linesout