Source code for synapse.utils.stormcov.plugin

import os
import lark
import regex
import logging
import coverage

from coverage.exceptions import NoSource

import synapse.common as s_common

import synapse.lib.datfile as s_datfile

logger = logging.getLogger(__name__)

[docs]class StormPlugin(coverage.CoveragePlugin, coverage.FileTracer): def __init__(self, options): extensions = options.get('storm_extensions', 'storm') self.extensions = [e.strip() for e in extensions.split(',')] with s_datfile.openDatFile('synapse.lib/storm.lark') as larkf: grammar = larkf.read().decode() self.parser = lark.Lark(grammar, start='query', debug=True, regex=True, parser='lalr', keep_all_tokens=True, maybe_placeholders=False, propagate_positions=True) self.node_map = {} self.text_map = {} self.guid_map = {} self.subq_map = {} dirs = options.get('storm_dirs', '.') if dirs: self.stormdirs = [d.strip() for d in dirs.split(',')] for dirn in self.stormdirs: self.find_storm_files(dirn)
[docs] def find_storm_files(self, dirn): for path in self.find_executable_files(dirn): with open(path, 'r') as f: apth = os.path.abspath(path) tree = self.parser.parse(f.read()) self.find_subqueries(tree, apth) guid = s_common.guid(str(tree)) self.guid_map[guid] = apth
[docs] def find_subqueries(self, tree, path): for rule in ('argvquery', 'embedquery'): for node in tree.find_data(rule): subq = node.children[1] if subq.meta.empty: continue subg = s_common.guid(str(subq)) line = (subq.meta.line - 1) if subg in self.subq_map: (pname, pline) = self.subq_map[subg] logger.warning(f'Duplicate {rule} in {path} at line {line + 1}, coverage will ' f'be reported on first instance in {pname} at line {pline + 1}') continue self.subq_map[subg] = (path, subq.meta.line - 1)
[docs] def file_tracer(self, filename): if filename.endswith('synapse/lib/ast.py'): return self if filename.endswith('synapse/lib/stormctrl.py'): return StormCtrlTracer(self) if filename.endswith('synapse/lib/snap.py'): return PivotTracer(self) return None
[docs] def file_reporter(self, filename): return StormReporter(filename, self.parser)
[docs] def find_executable_files(self, src_dir): rx = r"^[^#~!$@%^&*()+=,]+\.(" + "|".join(self.extensions) + r")$" for (dirpath, dirnames, filenames) in os.walk(src_dir): for filename in filenames: if regex.search(rx, filename): path = os.path.join(dirpath, filename) yield path
[docs] def has_dynamic_source_filename(self): return True
PARSE_METHODS = {'compute', 'once', 'lift', 'getPivsOut', 'getPivsIn'}
[docs] def dynamic_source_filename(self, filename, frame, force=False): if frame.f_code.co_name == 'pullgenr': if frame.f_back.f_code.co_name != 'execStormCmd': return None frame = frame.f_back.f_back elif frame.f_code.co_name not in self.PARSE_METHODS and not force: return None realnode = frame.f_locals.get('self') node = realnode while hasattr(node, 'parent'): node = node.parent nodeid = id(node) info = self.node_map.get(nodeid, s_common.novalu) if info is None: return if info is not s_common.novalu: realnode._coverage_offs = info[1] return info[0] if not node.__class__.__name__ == 'Query': self.node_map[nodeid] = None return info = self.text_map.get(node.text, s_common.novalu) if info is not s_common.novalu: realnode._coverage_offs = info[1] return info[0] tree = self.parser.parse(node.text) guid = s_common.guid(str(tree)) subq = self.subq_map.get(guid) if subq is not None: (filename, offs) = subq else: filename = self.guid_map.get(guid) offs = 0 realnode._coverage_offs = offs self.node_map[nodeid] = (filename, offs) self.text_map[node.text] = (filename, offs) return filename
[docs] def line_number_range(self, frame): if frame.f_code.co_name == 'pullgenr': frame = frame.f_back.f_back astn = frame.f_locals.get('self') offs = astn._coverage_offs strt = astn.astinfo.sline if astn.astinfo.isterm: fini = astn.astinfo.eline else: fini = strt return (strt + offs, fini + offs)
[docs]class StormCtrlTracer(coverage.FileTracer): def __init__(self, parent): self.parent = parent
[docs] def has_dynamic_source_filename(self): return True
[docs] def dynamic_source_filename(self, filename, frame): if frame.f_code.co_name != '__init__': return None return self.parent.dynamic_source_filename(None, frame.f_back, force=True)
[docs] def line_number_range(self, frame): return self.parent.line_number_range(frame.f_back)
[docs]class PivotTracer(coverage.FileTracer): def __init__(self, parent): self.parent = parent
[docs] def has_dynamic_source_filename(self): return True
PARSE_METHODS = {'nodesByPropValu', 'nodesByPropArray', 'nodesByTag', 'getNodeByNdef'}
[docs] def dynamic_source_filename(self, filename, frame): if frame.f_code.co_name not in self.PARSE_METHODS or frame.f_back.f_code.co_name != 'run': return None return self.parent.dynamic_source_filename(None, frame.f_back, force=True)
[docs] def line_number_range(self, frame): return self.parent.line_number_range(frame.f_back)
TOKENS = [ 'ABSPROP', 'ABSPROPNOUNIV', 'PROPS', 'UNIVNAME', 'EXPRUNIVNAME', 'RELNAME', 'EXPRRELNAME', 'ALLTAGS', 'BREAK', 'CONTINUE', 'CMDNAME', 'TAGMATCH', 'NONQUOTEWORD', 'VARTOKN', 'EXPRVARTOKN', 'NUMBER', 'HEXNUMBER', 'OCTNUMBER', 'BOOL', 'EXPRTIMES', '_EMIT', '_STOP', '_RETURN', ]
[docs]class StormReporter(coverage.FileReporter): def __init__(self, filename, parser): super().__init__(filename) self._parser = parser self._source = None
[docs] def source(self): if self._source is None: try: with open(self.filename, 'r') as f: self._source = f.read() except (OSError, UnicodeError) as exc: raise NoSource(f"Couldn't read {self.filename}: {exc}") return self._source
[docs] def lines(self): source_lines = set() tree = self._parser.parse(self.source()) for token in tree.scan_values(lambda v: isinstance(v, lark.lexer.Token)): if token.type in TOKENS: source_lines.add(token.line) return source_lines