Source code for synapse.lib.rstorm

import os
import copy
import json
import pprint
import logging
import contextlib
import collections

import vcr
import regex

from unittest import mock

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.base as s_base
import synapse.lib.output as s_output
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.stormhttp as s_stormhttp

import synapse.cmds.cortex as s_cmds_cortex

import synapse.tools.storm as s_storm
import synapse.tools.genpkg as s_genpkg


re_directive = regex.compile(r'^\.\.\s(storm.*|[^:])::(?:\s(.*)$|$)')

logger = logging.getLogger(__name__)

ONLOAD_TIMEOUT = int(os.getenv('SYNDEV_PKG_LOAD_TIMEOUT', 30))  # seconds

[docs]class OutPutRst(s_output.OutPutStr): ''' Rst specific helper for output intended to be indented in RST text as a literal block. ''' prefix = ' '
[docs] def printf(self, mesg, addnl=True): if '\n' in mesg: logger.debug(f'Newline found in [{mesg}]') parts = mesg.split('\n') mesg0 = '\n'.join([self.prefix + part for part in parts[1:]]) mesg = '\n'.join((parts[0], mesg0)) return s_output.OutPutStr.printf(self, mesg, addnl)
[docs]class StormOutput(s_cmds_cortex.StormCmd): ''' Produce standard output from a stream of storm runtime messages. Must be instantiated for a single query with a rstorm context. ''' _cmd_syntax = ( ('--hide-query', {}), ) + s_cmds_cortex.StormCmd._cmd_syntax def __init__(self, core, ctx, stormopts=None, opts=None): if opts is None: opts = {} s_cmds_cortex.StormCmd.__init__(self, None, **opts) self.stormopts = stormopts or {} # hide a few mesg types by default for mtype in ('init', 'fini', 'node:edits', 'node:edits:count', ): self.cmdmeths[mtype] = self._silence self.core = core self.ctx = ctx self.lines = [] self.prefix = ' '
[docs] async def runCmdLine(self, line): opts = self.getCmdOpts(f'storm {line}') return await self.runCmdOpts(opts)
def _printNodeProp(self, name, valu): base = f' {name} = ' if '\n' in valu: parts = collections.deque(valu.split('\n')) ws = ' ' * len(base) self.printf(f'{base}{parts.popleft()}') while parts: part = parts.popleft() self.printf(f'{ws}{part}') else: self.printf(f'{base}{valu}') async def _mockHttp(self, *args, **kwargs): info = { 'code': 200, 'body': '{}', } resp = self.ctx.get('mock-http') if resp: body = resp.get('body') if isinstance(body, (dict, list)): body = json.dumps(body) info = { 'code': resp.get('code', 200), 'body': body.encode(), } return s_stormhttp.HttpResp(info) @contextlib.contextmanager def _shimHttpCalls(self, vcr_kwargs): path = self.ctx.get('mock-http-path') if not vcr_kwargs: vcr_kwargs = {} if path: path = os.path.abspath(path) # try it as json first (since yaml can load json...). if it parses, we're old school # if it doesn't, either it doesn't exist/we can't read it/we can't parse it. # in any of those cases, default to using vcr try: with open(path, 'r') as fd: byts = json.load(fd) except (FileNotFoundError, json.decoder.JSONDecodeError): byts = None if not byts: recorder = vcr.VCR(**vcr_kwargs) vcrcb = self.ctx.get('storm-vcr-callback', None) if vcrcb: vcrcb(recorder) with recorder.use_cassette(os.path.abspath(path)) as cass: yield cass self.ctx.pop('mock-http-path', None) else: # backwards compat if not os.path.isfile(path): raise s_exc.NoSuchFile(mesg='Storm HTTP mock filepath does not exist', path=path) self.ctx['mock-http'] = byts with mock.patch('synapse.lib.stormhttp.LibHttp._httpRequest', new=self._mockHttp): yield else: yield
[docs] def printf(self, mesg, addnl=True, color=None): line = f'{self.prefix}{mesg}' if '\n' in line: logger.debug(f'Newline found in [{mesg}]') parts = line.split('\n') mesg0 = '\n'.join([self.prefix + part for part in parts[1:]]) line = '\n'.join((parts[0], mesg0)) self.lines.append(line) return line
def _silence(self, mesg, opts): pass def _onErr(self, mesg, opts): # raise on err for rst if self.ctx.pop('storm-fail', None): s_cmds_cortex.StormCmd._onErr(self, mesg, opts) return raise s_exc.StormRuntimeError(mesg=mesg)
[docs] async def runCmdOpts(self, opts): text = opts.get('query') if not opts.get('hide-query'): self.printf(f'> {text}') stormopts = copy.deepcopy(self.stormopts) stormopts.setdefault('repr', True) stormopts.setdefault('path', opts.get('path', False)) hide_unknown = True # Let this raise on any errors with self._shimHttpCalls(self.ctx.get('storm-vcr-opts')): async for mesg in self.core.storm(text, opts=stormopts): if opts.get('debug'): self.printf(pprint.pformat(mesg)) continue try: func = self.cmdmeths[mesg[0]] except KeyError: # pragma: no cover if hide_unknown: continue self.printf(repr(mesg), color=s_cmds_cortex.UNKNOWN_COLOR) else: func(mesg, opts) return '\n'.join(self.lines)
[docs]class StormCliOutput(s_storm.StormCli): async def __anit__(self, item, outp=s_output.stdout, opts=None): await s_storm.StormCli.__anit__(self, item, outp, opts) self.ctx = {} self._print_skips.append('init') self._print_skips.append('fini') self._print_skips.append('prov:new') # TODO: Remove in v3.0.0 self._print_skips.append('node:edits') self._print_skips.append('node:edits:count')
[docs] def printf(self, mesg, addnl=True, color=None): mesg = f' {mesg}' s_storm.StormCli.printf(self, mesg, addnl, color)
[docs] async def handleErr(self, mesg): # raise on err for rst if self.ctx.pop('storm-fail', None): await s_storm.StormCli.handleErr(self, mesg) return raise s_exc.StormRuntimeError(mesg=mesg)
def _printNodeProp(self, name, valu): base = f' {name} = ' if '\n' in valu: parts = collections.deque(valu.split('\n')) ws = ' ' * len(base) self.printf(f'{base}{parts.popleft()}') while parts: part = parts.popleft() self.printf(f'{ws}{part}') else: self.printf(f'{base}{valu}') async def _mockHttp(self, *args, **kwargs): info = { 'code': 200, 'body': '{}', } resp = self.ctx.get('mock-http') if resp: body = resp.get('body') if isinstance(body, (dict, list)): body = json.dumps(body) info = { 'code': resp.get('code', 200), 'body': body.encode(), } return s_stormhttp.HttpResp(info) @contextlib.contextmanager def _shimHttpCalls(self, vcr_kwargs): path = self.ctx.get('mock-http-path') if not vcr_kwargs: vcr_kwargs = {} if path: path = os.path.abspath(path) # try it as json first (since yaml can load json...). if it parses, we're old school # if it doesn't, either it doesn't exist/we can't read it/we can't parse it. # in any of those cases, default to using vcr try: with open(path, 'r') as fd: byts = json.load(fd) except (FileNotFoundError, json.decoder.JSONDecodeError): byts = None if not byts: with vcr.use_cassette(os.path.abspath(path), **vcr_kwargs) as cass: yield cass self.ctx.pop('mock-http-path', None) else: # backwards compat if not os.path.isfile(path): raise s_exc.NoSuchFile(mesg='Storm HTTP mock filepath does not exist', path=path) self.ctx['mock-http'] = byts with mock.patch('synapse.lib.stormhttp.LibHttp._httpRequest', new=self._mockHttp): yield else: yield
[docs] async def runRstCmdLine(self, text, ctx, stormopts=None): self.ctx = ctx self.printf(self.cmdprompt + text) with self._shimHttpCalls(self.ctx.get('storm-vcr-opts')): await self.runCmdLine(text, opts=stormopts) return str(self.outp)
[docs]@contextlib.asynccontextmanager async def getCell(ctor, conf): loc = s_dyndeps.getDynLocal(ctor) if loc is None: raise s_exc.NoSuchCtor(mesg=f'Unable to resolve ctor [{ctor}]', ctor=ctor) with s_common.getTempDir() as dirn: async with await loc.anit(dirn, conf=conf) as cell: yield cell
[docs]class StormRst(s_base.Base): async def __anit__(self, rstfile): await s_base.Base.__anit__(self) self.rstfile = s_common.genpath(rstfile) if not os.path.isfile(rstfile): raise s_exc.BadConfValu(mesg='A valid rstfile must be specified', rstfile=self.rstfile) self.linesout = [] self.context = {} self.stormvars = {} self.core = None self.handlers = { 'storm': self._handleStorm, 'storm-cli': self._handleStormCli, 'storm-pkg': self._handleStormPkg, 'storm-pre': self._handleStormPre, 'storm-svc': self._handleStormSvc, 'storm-fail': self._handleStormFail, 'storm-opts': self._handleStormOpts, 'storm-cortex': self._handleStormCortex, 'storm-envvar': self._handleStormEnvVar, 'storm-expect': self._handleStormExpect, 'storm-multiline': self._handleStormMultiline, 'storm-mock-http': self._handleStormMockHttp, 'storm-vcr-opts': self._handleStormVcrOpts, 'storm-clear-http': self._handleStormClearHttp, 'storm-vcr-callback': self._handleStormVcrCallback, } async def _getCell(self, ctor, conf=None): if conf is None: conf = {} cell = await self.enter_context(getCell(ctor, conf)) return cell def _printf(self, line): self.linesout.append(line) def _reqCore(self): if self.core is None: mesg = 'No cortex set. Use .. storm-cortex::' raise s_exc.NoSuchVar(mesg=mesg) return self.core def _getHandler(self, directive): handler = self.handlers.get(directive) if handler is None: raise s_exc.NoSuchName(mesg=f'The {directive} directive is not supported', directive=directive) return handler async def _handleStorm(self, text): ''' Run a Storm command and generate text from the output. Args: text (str): A valid Storm query. ''' core = self._reqCore() text = self._getStormMultiline(text) self._printf('::\n') self._printf('\n') soutp = StormOutput(core, self.context, stormopts=self.context.get('storm-opts')) self._printf(await soutp.runCmdLine(text)) if self.context.pop('storm-fail', None): raise s_exc.StormRuntimeError(mesg='Expected a failure, but none occurred.') self._printf('\n\n') async def _handleStormCli(self, text): core = self._reqCore() outp = OutPutRst() text = self._getStormMultiline(text) self._printf('::\n') self._printf('\n') cli = await StormCliOutput.anit(item=core, outp=outp) self._printf(await cli.runRstCmdLine(text, self.context, stormopts=self.context.get('storm-opts'))) if self.context.pop('storm-fail', None): raise s_exc.StormRuntimeError(mesg='Expected a failure, but none occurred.') self._printf('\n') async def _handleStormPkg(self, text): ''' Load a Storm package into the Cortex by path. Args: text (str): The path to a Storm package YAML file. ''' if not os.path.isfile(text): raise s_exc.NoSuchFile(mesg='Storm Package filepath does not exist', path=text) core = self._reqCore() pkg = s_genpkg.loadPkgProto(text) if pkg.get('onload') is not None: waiter = core.waiter(1, 'core:pkg:onload:complete') else: waiter = None await core.addStormPkg(pkg) if waiter is not None and not await waiter.wait(timeout=ONLOAD_TIMEOUT): raise s_exc.SynErr(mesg=f'Package onload failed to run for {pkg.get("name")}') async def _handleStormPre(self, text): ''' Run a Storm query to prepare the Cortex without output. Args: text (str): A valid Storm query ''' core = self._reqCore() self.context.setdefault('storm-opts', {}) stormopts = self.context.get('storm-opts') stormopts.setdefault('vars', {}) # only map env vars in for storm-pre stormopts = copy.deepcopy(stormopts) stormopts['vars'].update(self.stormvars) soutp = StormOutput(core, self.context, stormopts=stormopts) await soutp.runCmdLine(text) self.context.pop('storm-fail', None) async def _handleStormSvc(self, text): ''' Load a Storm service by ctor and add to the Cortex. Args: text (str): <ctor> <svcname> <optional JSON string to use as svcconf> ''' core = self._reqCore() splts = text.split(' ', 2) ctor, svcname = splts[:2] svcconf = json.loads(splts[2].strip()) if len(splts) == 3 else {} svc = await self._getCell(ctor, conf=svcconf) onloadcnt = len([p for p in svc.cellapi._storm_svc_pkgs if p.get('onload') is not None]) waiter = core.waiter(onloadcnt, 'core:pkg:onload:complete') if onloadcnt else None svc.dmon.share('svc', svc) root = await svc.auth.getUserByName('root') await root.setPasswd('root') info = await svc.dmon.listen('tcp://127.0.0.1:0/') svc.dmon.test_addr = info host, port = info surl = f'tcp://root:[email protected]:{port}/svc' await core.nodes(f'service.add {svcname} {surl}') await core.nodes(f'$lib.service.wait({svcname})') if waiter is not None and not await waiter.wait(timeout=ONLOAD_TIMEOUT): raise s_exc.SynErr(mesg=f'Package onload failed to run for service {svcname}') async def _handleStormFail(self, text): valu = json.loads(text) assert valu in (True, False), f'storm-fail must be a boolean: {text}' self.context['storm-fail'] = valu def _getStormMultiline(self, text): if '=' in text: sentinel, key = text.split('=', 1) if sentinel != 'MULTILINE': return text ret = self.context.get('multiline', {}).get(key) assert ret is not None, f'Invalid multiline text: {text}' return ret return text async def _handleStormMultiline(self, text): key, valu = text.split('=', 1) assert key.isupper() valu = json.loads(valu) assert isinstance(valu, str) multi = self.context.get('multiline', {}) multi[key] = valu self.context['multiline'] = multi async def _handleStormOpts(self, text): ''' Opts to use in subsequent Storm queries. Args: text (str): JSON string, e.g. {"vars": {"foo": "bar"}} ''' item = json.loads(text) self.context['storm-opts'] = item async def _handleStormClearHttp(self, text): ''' Reset the storm http context and any associated opts with it. Args: text (str): true if you also want to clear any storm/vcr opts as well ''' if text == 'true': self.context.pop('storm-opts', None) self.context.pop('storm-vcr-opts', None) self.context.pop('storm-vcr-callback', None) self.context.pop('mock-http-path', None) self.context.pop('mock-http', None) async def _handleStormCortex(self, text): ''' Spin up a default Cortex if ctor=default, else load the defined ctor. TODO: Handle providing conf in text Args: text (str): "default" or a ctor (e.g. synapse.cortex.Cortex) ''' if self.core is not None: await self.core.fini() self.core = None ctor = 'synapse.cortex.Cortex' if text == 'default' else text self.core = await self._getCell(ctor) async def _handleStormEnvVar(self, text): name, valu = text.split('=', 1) name = name.strip() valu = valu.strip() self.stormvars[name] = os.getenv(name, valu) async def _handleStormExpect(self, text): # TODO handle some light weight output confirmation. return async def _handleStormMockHttp(self, text): ''' Setup an HTTP mock file to be used with a later Storm command. Response file format: { "code": int, "body": { "data": json or a json str } } Args: text (str): Path to a json file with the response. ''' self.context['mock-http-path'] = text async def _handleStormVcrOpts(self, text): ''' Opts to pass to VCRPY for use in generating docs Args: text (str): JSON string, e.g. {"filter_query_args": true} ''' item = json.loads(text) self.context['storm-vcr-opts'] = item async def _handleStormVcrCallback(self, text): ''' Get a callback function as a dynlocal ''' cb = s_dyndeps.getDynLocal(text) if cb is None: raise s_exc.NoSuchCtor(mesg=f'Failed to get callback "{text}"', ctor=text) self.context['storm-vcr-callback'] = cb async def _readline(self, line): match = re_directive.match(line) if match is not None: directive, text = match.groups() text = text.strip() handler = self._getHandler(directive) logger.debug(f'Executing {directive} -> {text}') await handler(text) return self._printf(line)
[docs] async def run(self): ''' Parses the specified RST file with Storm directive handling. Returns: list: List of line strings for the RST output ''' with open(self.rstfile, 'r') as fd: lines = fd.readlines() for line in lines: await self._readline(line) return self.linesout