Source code for synapse.lib.trigger

import asyncio
import logging
import contextlib
import collections
import contextvars

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.chop as s_chop
import synapse.lib.cache as s_cache
import synapse.lib.config as s_config
import synapse.lib.grammar as s_grammar

logger = logging.getLogger(__name__)

Conditions = set((
    'tag:add',
    'tag:del',
    'node:add',
    'node:del',
    'prop:set',
    'edge:add',
    'edge:del'
))

RecursionDepth = contextvars.ContextVar('RecursionDepth', default=0)

# TODO: standardize locations for form/prop/tags regex

tagrestr = r'((\w+|\*|\*\*)\.)*(\w+|\*|\*\*)'  # tag with optional single or double * as segment
_tagre, _formre, _propre = (f'^{re}$' for re in (tagrestr, s_grammar.formrestr, s_grammar.proporunivrestr))

TrigSchema = {
    'type': 'object',
    'properties': {
        'iden': {'type': 'string', 'pattern': s_config.re_iden},
        'user': {'type': 'string', 'pattern': s_config.re_iden},
        'view': {'type': 'string', 'pattern': s_config.re_iden},
        'form': {'type': 'string', 'pattern': _formre},
        'n2form': {'type': 'string', 'pattern': _formre},
        'tag': {'type': 'string', 'pattern': _tagre},
        'prop': {'type': 'string', 'pattern': _propre},
        'verb': {'type': 'string', },
        'name': {'type': 'string', },
        'doc': {'type': 'string', },
        'cond': {'enum': ['node:add', 'node:del', 'tag:add', 'tag:del', 'prop:set', 'edge:add', 'edge:del']},
        'storm': {'type': 'string'},
        'async': {'type': 'boolean'},
        'enabled': {'type': 'boolean'},
        'created': {'type': 'integer', 'minimum': 0},
    },
    'additionalProperties': True,
    'required': ['iden', 'user', 'storm', 'enabled'],
    'allOf': [
        {
            'if': {'properties': {'cond': {'const': 'node:add'}}},
            'then': {'required': ['form']},
        },
        {
            'if': {'properties': {'cond': {'const': 'node:del'}}},
            'then': {'required': ['form']},
        },
        {
            'if': {'properties': {'cond': {'const': 'tag:add'}}},
            'then': {'required': ['tag']},
        },
        {
            'if': {'properties': {'cond': {'const': 'tag:del'}}},
            'then': {'required': ['tag']},
        },
        {
            'if': {'properties': {'cond': {'const': 'prop:set'}}},
            'then': {'required': ['prop']},
        },
        {
            'if': {'properties': {'cond': {'const': 'edge:add'}}},
            'then': {'required': ['verb']},
        },
        {
            'if': {'properties': {'cond': {'const': 'edge:del'}}},
            'then': {'required': ['verb']},
        },
    ],
}
TrigSchemaValidator = s_config.getJsValidator(TrigSchema)

[docs]def reqValidTdef(conf): TrigSchemaValidator(conf)
[docs]class Triggers: ''' Manages "triggers", conditions where changes in data result in new storm queries being executed. Note: These methods should not be called directly under normal circumstances. Use the owning "View" object to ensure that mirrors/clusters members get the same changes. ''' def __init__(self, view): self.view = view self.triggers = {} self.tagadd = collections.defaultdict(list) # (form, tag): [ Trigger ... ] self.tagset = collections.defaultdict(list) # (form, tag): [ Trigger ... ] self.tagdel = collections.defaultdict(list) # (form, tag): [ Trigger ... ] self.tagaddglobs = collections.defaultdict(s_cache.TagGlobs) # form: TagGlobs self.tagsetglobs = collections.defaultdict(s_cache.TagGlobs) # form: TagGlobs self.tagdelglobs = collections.defaultdict(s_cache.TagGlobs) # form: TagGlobs self.nodeadd = collections.defaultdict(list) # form: [ Trigger ... ] self.nodedel = collections.defaultdict(list) # form: [ Trigger ... ] self.propset = collections.defaultdict(list) # prop: [ Trigger ... ] self.edgeadd = collections.defaultdict(list) # (n1form, verb, n2form: [ Trigger ... ] self.edgedel = collections.defaultdict(list) # (n1form, verb, n2form: [ Trigger ... ] self.edgeaddglobs = collections.defaultdict(s_cache.EdgeGlobs) # (n1form, n2form: [ EdgeGlobs ... ] self.edgedelglobs = collections.defaultdict(s_cache.EdgeGlobs) # (n1form, n2form: [ EdgeGlobs ... ] self.edgeaddcache = s_cache.LruDict() self.edgedelcache = s_cache.LruDict() @contextlib.contextmanager def _recursion_check(self): depth = RecursionDepth.get() if depth > 64: raise s_exc.RecursionLimitHit(mesg='Hit trigger limit') token = RecursionDepth.set(depth + 1) try: yield finally: RecursionDepth.reset(token)
[docs] async def runNodeAdd(self, node): with self._recursion_check(): [await trig.execute(node) for trig in self.nodeadd.get(node.form.name, ())]
[docs] async def runNodeDel(self, node): with self._recursion_check(): [await trig.execute(node) for trig in self.nodedel.get(node.form.name, ())]
[docs] async def runPropSet(self, node, prop, oldv): vars = {'propname': prop.name, 'propfull': prop.full, 'auto': {'opts': {'propname': prop.name, 'propfull': prop.full, }}, } with self._recursion_check(): [await trig.execute(node, vars=vars) for trig in self.propset.get(prop.full, ())] if prop.univ is not None: [await trig.execute(node, vars=vars) for trig in self.propset.get(prop.univ.full, ())]
[docs] async def runTagAdd(self, node, tag): vars = {'tag': tag, 'auto': {'opts': {'tag': tag}}} with self._recursion_check(): for trig in self.tagadd.get((node.form.name, tag), ()): await trig.execute(node, vars=vars) for trig in self.tagadd.get((None, tag), ()): await trig.execute(node, vars=vars) # check for form specific globs globs = self.tagaddglobs.get(node.form.name) if globs is not None: for _, trig in globs.get(tag): await trig.execute(node, vars=vars) # check for form agnostic globs globs = self.tagaddglobs.get(None) if globs is not None: for _, trig in globs.get(tag): await trig.execute(node, vars=vars)
[docs] async def runTagDel(self, node, tag): vars = {'tag': tag, 'auto': {'opts': {'tag': tag}}, } with self._recursion_check(): for trig in self.tagdel.get((node.form.name, tag), ()): await trig.execute(node, vars=vars) for trig in self.tagdel.get((None, tag), ()): await trig.execute(node, vars=vars) # check for form specific globs globs = self.tagdelglobs.get(node.form.name) if globs is not None: for _, trig in globs.get(tag): await trig.execute(node, vars=vars) # check for form agnostic globs globs = self.tagdelglobs.get(None) if globs is not None: for _, trig in globs.get(tag): await trig.execute(node, vars=vars)
[docs] async def runEdgeAdd(self, n1, verb, n2): n1form = n1.form.name if n1 else None n2form = n2.form.name if n2 else None n2iden = n2.iden() if n2 else None varz = {'auto': {'opts': {'verb': verb, 'n2iden': n2iden}}} with self._recursion_check(): cachekey = (n1form, verb, n2form) cached = self.edgeaddcache.get(cachekey) if cached is None: cached = [] for trig in self.edgeadd.get((None, verb, None), ()): cached.append(trig) globs = self.edgeaddglobs.get((None, None)) if globs: for _, trig in globs.get(verb): cached.append(trig) if n1: for trig in self.edgeadd.get((n1form, verb, None), ()): cached.append(trig) globs = self.edgeaddglobs.get((n1form, None)) if globs: for _, trig in globs.get(verb): cached.append(trig) if n2: for trig in self.edgeadd.get((None, verb, n2form), ()): cached.append(trig) globs = self.edgeaddglobs.get((None, n2form)) if globs: for _, trig in globs.get(verb): cached.append(trig) if n1 and n2: for trig in self.edgeadd.get((n1form, verb, n2form), ()): cached.append(trig) globs = self.edgeaddglobs.get((n1form, n2form)) if globs: for _, trig in globs.get(verb): cached.append(trig) self.edgeaddcache[cachekey] = cached for trig in cached: await trig.execute(n1, vars=varz)
[docs] async def runEdgeDel(self, n1, verb, n2): n1form = n1.form.name if n1 else None n2form = n2.form.name if n2 else None n2iden = n2.iden() if n2 else None varz = {'auto': {'opts': {'verb': verb, 'n2iden': n2iden}}} with self._recursion_check(): cachekey = (n1form, verb, n2form) cached = self.edgedelcache.get(cachekey) if cached is None: cached = [] for trig in self.edgedel.get((None, verb, None), ()): cached.append(trig) globs = self.edgedelglobs.get((None, None)) if globs: for _, trig in globs.get(verb): cached.append(trig) if n1: for trig in self.edgedel.get((n1form, verb, None), ()): cached.append(trig) globs = self.edgedelglobs.get((n1form, None)) if globs: for _, trig in globs.get(verb): cached.append(trig) if n2: for trig in self.edgedel.get((None, verb, n2form), ()): cached.append(trig) globs = self.edgedelglobs.get((None, n2form)) if globs: for _, trig in globs.get(verb): cached.append(trig) if n1 and n2: for trig in self.edgedel.get((n1form, verb, n2form), ()): cached.append(trig) globs = self.edgedelglobs.get((n1form, n2form)) if globs: for _, trig in globs.get(verb): cached.append(trig) self.edgedelcache[cachekey] = cached for trig in cached: await trig.execute(n1, vars=varz)
[docs] async def load(self, tdef): trig = Trigger(self.view, tdef) # Make sure the query parses storm = trig.tdef['storm'] await self.view.core.getStormQuery(storm) cond = trig.tdef.get('cond') tag = trig.tdef.get('tag') form = trig.tdef.get('form') prop = trig.tdef.get('prop') verb = trig.tdef.get('verb') n2form = trig.tdef.get('n2form') if cond not in Conditions: raise s_exc.NoSuchCond(name=cond) if cond in ('node:add', 'node:del') and form is None: raise s_exc.BadOptValu(mesg='form must be present for node:add or node:del') if cond in ('node:add', 'node:del') and tag is not None: raise s_exc.BadOptValu(mesg='tag must not be present for node:add or node:del') if cond in ('tag:add', 'tag:del'): if tag is None: raise s_exc.BadOptValu(mesg='missing tag') s_chop.validateTagMatch(tag) if cond in ('edge:add', 'edge:del') and verb is None: raise s_exc.BadOptValu(mesg='verb must be present for edge:add or edge:del') if prop is not None and cond != 'prop:set': raise s_exc.BadOptValu(mesg='prop parameter invalid') if cond == 'node:add': self.nodeadd[form].append(trig) elif cond == 'node:del': self.nodedel[form].append(trig) elif cond == 'prop:set': if prop is None: raise s_exc.BadOptValu(mesg='missing prop parameter') if form is not None or tag is not None: raise s_exc.BadOptValu(mesg='form and tag must not be present for prop:set') self.propset[prop].append(trig) elif cond == 'tag:add': if '*' not in tag: self.tagadd[(form, tag)].append(trig) else: # we have a glob add self.tagaddglobs[form].add(tag, trig) elif cond == 'tag:del': if '*' not in tag: self.tagdel[(form, tag)].append(trig) else: self.tagdelglobs[form].add(tag, trig) elif cond == 'edge:add': self.edgeaddcache.clear() if '*' not in verb: self.edgeadd[(form, verb, n2form)].append(trig) else: self.edgeaddglobs[(form, n2form)].add(verb, trig) elif cond == 'edge:del': self.edgedelcache.clear() if '*' not in verb: self.edgedel[(form, verb, n2form)].append(trig) else: self.edgedelglobs[(form, n2form)].add(verb, trig) self.triggers[trig.iden] = trig return trig
[docs] def list(self): return list(self.triggers.items())
[docs] def pop(self, iden): trig = self.triggers.pop(iden, None) if trig is None: return None cond = trig.tdef.get('cond') if cond == 'node:add': form = trig.tdef['form'] self.nodeadd[form].remove(trig) return trig if cond == 'node:del': form = trig.tdef['form'] self.nodedel[form].remove(trig) return trig if cond == 'prop:set': prop = trig.tdef.get('prop') self.propset[prop].remove(trig) return trig if cond == 'tag:add': tag = trig.tdef.get('tag') form = trig.tdef.get('form') if '*' not in tag: self.tagadd[(form, tag)].remove(trig) return trig globs = self.tagaddglobs.get(form) globs.rem(tag, trig) return trig if cond == 'tag:del': tag = trig.tdef['tag'] form = trig.tdef.get('form') if '*' not in tag: self.tagdel[(form, tag)].remove(trig) return trig globs = self.tagdelglobs.get(form) globs.rem(tag, trig) return trig if cond == 'edge:add': verb = trig.tdef['verb'] form = trig.tdef.get('form') n2form = trig.tdef.get('n2form') self.edgeaddcache.clear() if '*' not in verb: self.edgeadd[(form, verb, n2form)].remove(trig) return trig globs = self.edgeaddglobs.get((form, n2form)) globs.rem(verb, trig) return trig if cond == 'edge:del': verb = trig.tdef['verb'] form = trig.tdef.get('form') n2form = trig.tdef.get('n2form') self.edgedelcache.clear() if '*' not in verb: self.edgedel[(form, verb, n2form)].remove(trig) return trig globs = self.edgedelglobs.get((form, n2form)) globs.rem(verb, trig) return trig raise AssertionError('trigger has invalid condition')
[docs] def get(self, iden): return self.triggers.get(iden)
[docs]class Trigger: def __init__(self, view, tdef): self.view = view self.tdef = tdef self.iden = tdef.get('iden') self.lockwarned = False self.startcount = 0 self.errcount = 0 self.lasterrs = collections.deque((), maxlen=5) useriden = self.tdef.get('user') self.user = self.view.core.auth.user(useriden)
[docs] async def set(self, name, valu): ''' Set one of the dynamic elements of the trigger definition. ''' if name not in ('enabled', 'user', 'storm', 'doc', 'name', 'async'): raise s_exc.BadArg(mesg=f'Invalid key name provided: {name}') if valu == self.tdef.get(name): return if name == 'user': self.user = await self.view.core.auth.reqUser(valu) if name == 'storm': await self.view.core.getStormQuery(valu) self.tdef[name] = valu await self.view.trigdict.set(self.iden, self.tdef)
[docs] def get(self, name): return self.tdef.get(name)
[docs] async def execute(self, node, vars=None): ''' Actually execute the query ''' if not self.tdef.get('enabled'): return if self.tdef.get('async'): triginfo = {'buid': node.buid, 'trig': self.iden, 'vars': vars} await self.view.addTrigQueue(triginfo) return return await self._execute(node, vars=vars)
async def _execute(self, node, vars=None): locked = self.user.info.get('locked') if locked: if not self.lockwarned: self.lockwarned = True logger.warning(f'Skipping trigger execution because user {self.user.iden} is locked') return tag = self.tdef.get('tag') cond = self.tdef.get('cond') form = self.tdef.get('form') prop = self.tdef.get('prop') storm = self.tdef.get('storm') query = await self.view.core.getStormQuery(storm) if vars is None: vars = {} else: vars = vars.copy() autovars = vars.setdefault('auto', {}) autovars.update({'iden': self.iden, 'type': 'trigger'}) optvars = autovars.setdefault('opts', {}) optvars['form'] = node.ndef[0] optvars['valu'] = node.ndef[1] opts = {'vars': vars, 'user': self.user.iden, 'view': self.view.iden} self.startcount += 1 try: async with self.view.core.getStormRuntime(query, opts=opts) as runt: runt.addInput(node) await s_common.aspin(runt.execute()) except (asyncio.CancelledError, s_exc.RecursionLimitHit): raise except Exception as e: self.errcount += 1 self.lasterrs.append(str(e)) logger.exception('Trigger encountered exception running storm query %s', storm)
[docs] def pack(self): tdef = self.tdef.copy() useriden = tdef['user'] triguser = self.view.core.auth.user(useriden) tdef['startcount'] = self.startcount tdef['errcount'] = self.errcount tdef['lasterrs'] = list(self.lasterrs) if triguser is not None: tdef['username'] = triguser.name return tdef
[docs] def getStorNode(self, form): ndef = (form.name, form.type.norm(self.iden)[0]) buid = s_common.buid(ndef) props = { 'doc': self.tdef.get('doc', ''), 'name': self.tdef.get('name', ''), 'vers': self.tdef.get('ver', 1), 'cond': self.tdef.get('cond'), 'storm': self.tdef.get('storm'), 'enabled': self.tdef.get('enabled'), 'user': self.tdef.get('user'), '.created': self.tdef.get('created') } tag = self.tdef.get('tag') if tag is not None: props['tag'] = tag formprop = self.tdef.get('form') if formprop is not None: props['form'] = formprop prop = self.tdef.get('prop') if prop is not None: props['prop'] = prop verb = self.tdef.get('verb') if verb is not None: props['verb'] = verb n2form = self.tdef.get('n2form') if n2form is not None: props['n2form'] = n2form pnorms = {} for prop, valu in props.items(): formprop = form.props.get(prop) if formprop is not None and valu is not None: pnorms[prop] = formprop.type.norm(valu)[0] return (buid, { 'ndef': ndef, 'props': pnorms, })