Source code for synapse.cmds.cortex

import os
import json
import queue
import shlex
import pprint
import logging

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.cli as s_cli
import synapse.lib.cmd as s_cmd
import synapse.lib.node as s_node
import synapse.lib.time as s_time
import synapse.lib.msgpack as s_msgpack

logger = logging.getLogger(__name__)

RED = '#ff0066'
YELLOW = '#f4e842'
BLUE = '#6faef2'
DARKBLUE = '#4842f5'

PROVNEW_COLOR = DARKBLUE
UNKNOWN_COLOR = BLUE
NODEEDIT_COLOR = "lightblue"
WARNING_COLOR = YELLOW
SYNTAX_ERROR_COLOR = RED

[docs]class Log(s_cli.Cmd): '''Add a storm log to the local command session. Notes: By default, the log file contains all messages received from the execution of a Storm query by the current CLI. By default, these messages are saved to a file located in ~/.syn/stormlogs/storm_(date).(format). Examples: # Enable logging all messages to mpk files (default) log --on # Disable logging and close the current file log --off # Enable logging, but only log edits. Log them as jsonl instead of mpk. log --on --edits-only --format jsonl # Enable logging, but log to a custom path: log --on --path /my/aweome/log/directory/storm20010203.mpk # Log only the node messages which come back from a storm cmd execution. log --on --nodes-only --path /my/awesome/log/directory/stormnodes20010203.mpk ''' _cmd_name = 'log' _cmd_syntax = ( ('line', {'type': 'glob'}), # type: ignore ) def _make_argparser(self): parser = s_cmd.Parser(prog='log', outp=self, description=self.__doc__) muxp = parser.add_mutually_exclusive_group(required=True) muxp.add_argument('--on', action='store_true', default=False, help='Enables logging of storm messages to a file.') muxp.add_argument('--off', action='store_true', default=False, help='Disables message logging and closes the current storm file.') parser.add_argument('--format', choices=('mpk', 'jsonl'), default='mpk', type=str.lower, help='The format used to save messages to disk. Defaults to msgpack (mpk).') parser.add_argument('--path', type=str, default=None, help='The path to the log file. This will append messages to a existing file.') optmux = parser.add_mutually_exclusive_group() optmux.add_argument('--edits-only', action='store_true', default=False, help='Only records edits. Does not record any other messages.') optmux.add_argument('--nodes-only', action='store_true', default=False, help='Only record the packed nodes returned by storm.') return parser def __init__(self, cli, **opts): s_cli.Cmd.__init__(self, cli, **opts) # Give ourselves a local ref to locs since we're stateful. self.locs = self._cmd_cli.locs self._cmd_cli.onfini(self.closeLogFd)
[docs] def onStormMesg(self, mesg): self.locs.get('log:queue').put(mesg)
[docs] @s_common.firethread def queueLoop(self): q = self.locs.get('log:queue') while not self._cmd_cli.isfini: try: mesg = q.get(timeout=2) except queue.Empty: continue smesg = mesg[1].get('mesg') self.save(smesg)
[docs] def save(self, mesg): fd = self.locs.get('log:fd') editsonly = self.locs.get('log:editsonly') nodesonly = self.locs.get('log:nodesonly') if fd and not fd.closed: if editsonly and mesg[0] != 'node:edits': return if nodesonly: if mesg[0] != 'node': return mesg = mesg[1] try: buf = self.encodeMsg(mesg) except Exception as e: # pragma: no cover logger.error('Failed to serialize message: [%s]', str(e)) return fd.write(buf)
[docs] def encodeMsg(self, mesg): '''Get byts for a message''' fmt = self.locs.get('log:fmt') if fmt == 'jsonl': s = json.dumps(mesg, sort_keys=True) + '\n' buf = s.encode() return buf elif fmt == 'mpk': buf = s_msgpack.en(mesg) return buf mesg = f'Unknown encoding format: {fmt}' raise s_exc.SynErr(mesg=mesg)
[docs] def closeLogFd(self): self._cmd_cli.off('storm:mesg', self.onStormMesg) q = self.locs.pop('log:queue', None) if q is not None: self.printf('Marking log queue done') thr = self.locs.pop('log:thr', None) if thr: self.printf('Joining log thread.') thr.join(2) fp = self.locs.pop('log:fp', None) fd = self.locs.pop('log:fd', None) for key in list(self.locs.keys()): if key.startswith('log:'): self.locs.pop(key, None) if fd: try: self.printf(f'Closing logfile: [{fp}]') fd.close() except Exception as e: # pragma: no cover self.printf(f'Failed to close fd: [{str(e)}]')
[docs] def openLogFd(self, opts): opath = self.locs.get('log:fp') if opath: self.printf('Must call --off to disable current file before starting a new file.') return fmt = opts.format path = opts.path nodes_only = opts.nodes_only edits_only = opts.edits_only if not path: ts = s_time.repr(s_common.now(), True) fn = f'storm_{ts}.{fmt}' path = s_common.getSynPath('stormlogs', fn) self.printf(f'Starting logfile at [{path}]') q = queue.Queue() fd = s_common.genfile(path) # Seek to the end of the file. Allows a user to append to a file. fd.seek(0, 2) self.locs['log:fp'] = path self.locs['log:fd'] = fd self.locs['log:fmt'] = fmt self.locs['log:queue'] = q self.locs['log:thr'] = self.queueLoop() self.locs['log:nodesonly'] = nodes_only self.locs['log:editsonly'] = edits_only self._cmd_cli.on('storm:mesg', self.onStormMesg)
[docs] async def runCmdOpts(self, opts): line = opts.get('line', '') try: opts = self._make_argparser().parse_args(shlex.split(line)) except s_exc.ParserExit: return if opts.on: return self.openLogFd(opts) if opts.off: return self.closeLogFd()
[docs]class StormCmd(s_cli.Cmd): ''' Execute a storm query. Syntax: storm <query> Arguments: query: The storm query Optional Arguments: --hide-tags: Do not print tags. --hide-props: Do not print secondary properties. --hide-unknown: Do not print messages which do not have known handlers. --show-nodeedits: Show full nodeedits (otherwise printed as a single . per edit). --editformat <format>: What format of edits the server shall emit. Options are * nodeedits (default), * count (just counts of nodeedits), or * none (no such messages emitted). --show-prov: Deprecated. This no longer does anything. --raw: Print the nodes in their raw format. This overrides --hide-tags and --hide-props. --debug: Display cmd debug information along with nodes in raw format. This overrides other display arguments. --path: Get path information about returned nodes. --show <names>: Limit storm events (server-side) to the comma-separated list. --file <path>: Run the storm query specified in the given file path. --optsfile <path>: Run the query with the given options from a JSON/YAML file. Examples: storm inet:ipv4=1.2.3.4 storm --debug inet:ipv4=1.2.3.4 ''' _cmd_name = 'storm' editformat_enums = ('nodeedits', 'count', 'none') _cmd_syntax = ( ('--hide-tags', {}), # type: ignore ('--show', {'type': 'valu'}), ('--show-nodeedits', {}), ('--show-prov', {}), # TODO: remove in 3.0.0 ('--editformat', {'type': 'enum', 'defval': 'nodeedits', 'enum:vals': editformat_enums}), ('--file', {'type': 'valu'}), ('--optsfile', {'type': 'valu'}), ('--hide-props', {}), ('--hide-unknown', {}), ('--raw', {}), ('--debug', {}), ('--path', {}), ('--save-nodes', {'type': 'valu'}), ('query', {'type': 'glob'}), ) def __init__(self, cli, **opts): s_cli.Cmd.__init__(self, cli, **opts) self.cmdmeths = { 'node': self._onNode, 'init': self._onInit, 'fini': self._onFini, 'print': self._onPrint, 'warn': self._onWarn, 'err': self._onErr, 'node:edits': self._onNodeEdits, 'node:edits:count': self._onNodeEditsCount, } self._indented = False
[docs] def printf(self, mesg, addnl=True, color=None): if self._indented: s_cli.Cmd.printf(self, '') self._indented = False return s_cli.Cmd.printf(self, mesg, addnl=addnl, color=color)
def _onNodeEdits(self, mesg, opts): edit = mesg[1] if not opts.get('show-nodeedits'): count = sum(len(e[2]) for e in edit.get('edits', ())) s_cli.Cmd.printf(self, '.' * count, addnl=False, color=NODEEDIT_COLOR) self._indented = True return self.printf(repr(mesg), color=NODEEDIT_COLOR) def _onNodeEditsCount(self, mesg, opts): count = mesg[1].get('count', 1) s_cli.Cmd.printf(self, '.' * count, addnl=False, color=NODEEDIT_COLOR) self._indented = True def _printNodeProp(self, name, valu): self.printf(f' {name} = {valu}') def _onNode(self, mesg, opts): node = mesg[1] if opts.get('raw'): self.printf(repr(node)) return formname, formvalu = s_node.reprNdef(node) self.printf(f'{formname}={formvalu}') if not opts.get('hide-props'): for name in sorted(s_node.props(node).keys()): valu = s_node.reprProp(node, name) if name[0] != '.': name = ':' + name self._printNodeProp(name, valu) if not opts.get('hide-tags'): for tag in sorted(s_node.tagsnice(node)): valu = s_node.reprTag(node, tag) tprops = s_node.reprTagProps(node, tag) printed = False if valu: self.printf(f' #{tag} = {valu}') printed = True if tprops: for prop, pval in tprops: self.printf(f' #{tag}:{prop} = {pval}') printed = True if not printed: self.printf(f' #{tag}') def _onInit(self, mesg, opts): tick = mesg[1].get('tick') if tick is not None: tick = s_time.repr(tick) self.printf(f'Executing query at {tick}') def _onFini(self, mesg, opts): took = mesg[1].get('took') took = max(took, 1) count = mesg[1].get('count') pers = float(count) / float(took / 1000) self.printf('complete. %d nodes in %d ms (%d/sec).' % (count, took, pers)) def _onPrint(self, mesg, opts): self.printf(mesg[1].get('mesg')) def _onWarn(self, mesg, opts): info = mesg[1] warn = info.pop('mesg', '') xtra = ', '.join([f'{k}={v}' for k, v in info.items()]) if xtra: warn = ' '.join([warn, xtra]) self.printf(f'WARNING: {warn}', color=WARNING_COLOR) def _onErr(self, mesg, opts): err = mesg[1] if err[0] == 'BadSyntax': pos = err[1].get('at', None) text = err[1].get('text', None) tlen = len(text) mesg = err[1].get('mesg', None) if pos is not None and text is not None and mesg is not None: text = text.replace('\n', ' ') # Handle too-long text if tlen > 60: text = text[max(0, pos - 30):pos + 30] if pos < tlen - 30: text += '...' if pos > 30: text = '...' + text pos = 33 self.printf(text, color=BLUE) self.printf(f'{" "*pos}^', color=BLUE) self.printf(f'Syntax Error: {mesg}', color=SYNTAX_ERROR_COLOR) return self.printf(f'ERROR: {err}', color=SYNTAX_ERROR_COLOR)
[docs] async def runCmdOpts(self, opts): text = opts.get('query') filename = opts.get('file') if bool(text) == bool(filename): self.printf('Cannot use a storm file and manual query together.') self.printf(self.__doc__) return if filename is not None: try: with open(filename, 'r') as fd: text = fd.read() except FileNotFoundError: self.printf('file not found: %s' % (filename,)) return stormopts = {} optsfile = opts.get('optsfile') if optsfile is not None: if not os.path.isfile(optsfile): self.printf('optsfile not found: %s' % (optsfile,)) return stormopts = s_common.yamlload(optsfile) hide_unknown = opts.get('hide-unknown', self._cmd_cli.locs.get('storm:hide-unknown')) core = self.getCmdItem() stormopts.setdefault('repr', True) stormopts.setdefault('path', opts.get('path', False)) showtext = opts.get('show') if showtext is not None: stormopts['show'] = showtext.split(',') editformat = opts['editformat'] if editformat != 'nodeedits': stormopts['editformat'] = editformat nodesfd = None if opts.get('save-nodes'): nodesfd = s_common.genfile(opts.get('save-nodes')) nodesfd.truncate(0) try: async for mesg in core.storm(text, opts=stormopts): await self._cmd_cli.fire('storm:mesg', mesg=mesg) if opts.get('debug'): self.printf(pprint.pformat(mesg)) continue if mesg[0] == 'node': if nodesfd is not None: byts = json.dumps(mesg[1]).encode() nodesfd.write(byts + b'\n') try: func = self.cmdmeths[mesg[0]] except KeyError: if hide_unknown: continue self.printf(repr(mesg), color=UNKNOWN_COLOR) else: func(mesg, opts) except s_exc.SynErr as e: if e.errinfo.get('errx') == 'CancelledError': self.printf('query canceled.') return raise finally: if nodesfd is not None: nodesfd.close()