import types
import asyncio
import decimal
import fnmatch
import hashlib
import logging
import binascii
import itertools
import contextlib
import collections
import regex
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.base as s_base
import synapse.lib.coro as s_coro
import synapse.lib.node as s_node
import synapse.lib.cache as s_cache
import synapse.lib.scope as s_scope
import synapse.lib.types as s_types
import synapse.lib.scrape as s_scrape
import synapse.lib.msgpack as s_msgpack
import synapse.lib.spooled as s_spooled
import synapse.lib.stormctrl as s_stormctrl
import synapse.lib.stormtypes as s_stormtypes
from synapse.lib.stormtypes import tobool, toint, toprim, tostr, tonumber, tocmprvalu, undef
logger = logging.getLogger(__name__)
[docs]
def parseNumber(x):
return s_stormtypes.Number(x) if '.' in x else s_stormtypes.intify(x)
[docs]
class AstNode:
'''
Base class for all nodes in the Storm abstract syntax tree.
'''
# set to True if recursive runt-safety checks should *not* recurse
# into children of this node.
runtopaque = False
def __init__(self, astinfo, kids=()):
self.kids = []
self.astinfo = astinfo
self.hasast = {}
[self.addKid(k) for k in kids]
[docs]
def getAstText(self):
return self.astinfo.text[self.astinfo.soff:self.astinfo.eoff]
[docs]
def getPosInfo(self):
return {
'hash': hashlib.md5(self.astinfo.text.encode(), usedforsecurity=False).hexdigest(),
'lines': (self.astinfo.sline, self.astinfo.eline),
'columns': (self.astinfo.scol, self.astinfo.ecol),
'offsets': (self.astinfo.soff, self.astinfo.eoff),
}
[docs]
def addExcInfo(self, exc):
if 'highlight' not in exc.errinfo:
exc.errinfo['highlight'] = self.getPosInfo()
return exc
[docs]
def repr(self):
return f'{self.__class__.__name__}: {self.kids}'
def __repr__(self):
return self.repr()
[docs]
def addKid(self, astn):
indx = len(self.kids)
self.kids.append(astn)
astn.parent = self
astn.pindex = indx
[docs]
def sibling(self, offs=1):
'''
Return sibling node by relative offset from self.
'''
indx = self.pindex + offs
if indx < 0:
return None
if indx >= len(self.parent.kids):
return None
return self.parent.kids[indx]
[docs]
def iterright(self):
'''
Yield "rightward" siblings until None.
'''
offs = 1
while True:
sibl = self.sibling(offs)
if sibl is None:
break
yield sibl
offs += 1
[docs]
def init(self, core):
[k.init(core) for k in self.kids]
self.prepare()
[docs]
def validate(self, runt):
[k.validate(runt) for k in self.kids]
[docs]
def prepare(self):
pass
[docs]
def hasAstClass(self, clss):
hasast = self.hasast.get(clss)
if hasast is not None:
return hasast
retn = False
for kid in self.kids:
if isinstance(kid, clss):
retn = True
break
if isinstance(kid, (EditPropSet, Function, CmdOper)):
continue
if kid.hasAstClass(clss):
retn = True
break
self.hasast[clss] = retn
return retn
[docs]
def optimize(self):
[k.optimize() for k in self.kids]
def __iter__(self):
for kid in self.kids:
yield kid
[docs]
def getRuntVars(self, runt):
for kid in self.kids:
yield from kid.getRuntVars(runt)
[docs]
def isRuntSafe(self, runt):
return all(k.isRuntSafe(runt) for k in self.kids)
[docs]
def isRuntSafeAtom(self, runt):
return True
[docs]
def reqRuntSafe(self, runt, mesg):
todo = collections.deque([self])
# depth first search for an non-runtsafe atom.
while todo:
nkid = todo.popleft()
if not nkid.isRuntSafeAtom(runt):
raise nkid.addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
if nkid.runtopaque:
continue
todo.extend(nkid.kids)
[docs]
def hasVarName(self, name):
return any(k.hasVarName(name) for k in self.kids)
[docs]
class LookList(AstNode): pass
[docs]
class Query(AstNode):
def __init__(self, astinfo, kids=()):
AstNode.__init__(self, astinfo, kids=kids)
# for options parsed from the query itself
self.opts = {}
self.text = self.getAstText()
[docs]
async def run(self, runt, genr):
async with contextlib.AsyncExitStack() as stack:
for oper in self.kids:
genr = await stack.enter_async_context(contextlib.aclosing(oper.run(runt, genr)))
async for node, path in genr:
runt.tick()
yield node, path
[docs]
async def iterNodePaths(self, runt, genr=None):
count = 0
self.optimize()
self.validate(runt)
# turtles all the way down...
if genr is None:
genr = runt.getInput()
async with contextlib.aclosing(self.run(runt, genr)) as agen:
async for node, path in agen:
runt.tick()
yield node, path
count += 1
limit = runt.getOpt('limit')
if limit is not None and count >= limit:
break
[docs]
class Lookup(Query):
'''
When storm input mode is "lookup"
'''
def __init__(self, astinfo, kids, autoadd=False):
Query.__init__(self, astinfo, kids=kids)
self.autoadd = autoadd
[docs]
async def run(self, runt, genr):
if runt.readonly and self.autoadd:
mesg = 'Autoadd may not be executed in readonly Storm runtime.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
async def getnode(form, valu):
try:
if self.autoadd:
runt.layerConfirm(('node', 'add', form))
return await runt.snap.addNode(form, valu)
else:
norm, info = runt.model.form(form).type.norm(valu)
node = await runt.snap.getNodeByNdef((form, norm))
if node is None:
await runt.snap.fire('look:miss', ndef=(form, norm))
return node
except s_exc.BadTypeValu:
return None
async def lookgenr():
async for item in genr:
yield item
tokns = [await kid.compute(runt, None) for kid in self.kids[0]]
if not tokns:
return
for tokn in tokns:
async for form, valu in s_scrape.scrapeAsync(tokn, first=True):
node = await getnode(form, valu)
if node is not None:
yield node, runt.initPath(node)
realgenr = lookgenr()
if len(self.kids) > 1:
realgenr = self.kids[1].run(runt, realgenr)
async for node, path in realgenr:
yield node, path
[docs]
class Search(Query):
[docs]
async def run(self, runt, genr):
view = runt.snap.view
if not view.core.stormiface_search:
await runt.snap.warn('Storm search interface is not enabled!', log=False)
return
async def searchgenr():
async for item in genr:
yield item
tokns = [await kid.compute(runt, None) for kid in self.kids[0]]
if not tokns:
return
async with await s_spooled.Set.anit(dirn=runt.snap.core.dirn, cell=runt.snap.core) as buidset:
todo = s_common.todo('search', tokns)
async for (prio, buid) in view.mergeStormIface('search', todo):
if buid in buidset:
await asyncio.sleep(0)
continue
await buidset.add(buid)
node = await runt.snap.getNodeByBuid(buid)
if node is not None:
yield node, runt.initPath(node)
realgenr = searchgenr()
if len(self.kids) > 1:
realgenr = self.kids[1].run(runt, realgenr)
async for node, path in realgenr:
yield node, path
[docs]
class SubGraph:
'''
An Oper like object which generates a subgraph.
Notes:
The rules format for the subgraph is shaped like the following::
rules = {
'degrees': 1,
'edges': True,
'edgelimit': 3000,
'filterinput': True,
'yieldfiltered': False,
'filters': [
'-(#foo or #bar)',
'-(foo:bar or baz:faz)',
],
'pivots': [
'-> * | limit 100',
'<- * | limit 100',
]
'forms': {
'inet:fqdn':{
'filters': [],
'pivots': [],
}
'*': {
'filters': [],
'pivots': [],
},
},
}
Nodes which were original seeds have path.meta('graph:seed').
All nodes have path.meta('edges') which is a list of (iden, info) tuples.
'''
def __init__(self, rules):
self.omits = {}
self.rules = rules
self.rules.setdefault('forms', {})
self.rules.setdefault('pivots', ())
self.rules.setdefault('filters', ())
self.rules.setdefault('existing', ())
self.rules.setdefault('refs', False)
self.rules.setdefault('edges', True)
self.rules.setdefault('degrees', 1)
self.rules.setdefault('maxsize', 100000)
self.rules.setdefault('edgelimit', 3000)
self.rules.setdefault('filterinput', True)
self.rules.setdefault('yieldfiltered', False)
[docs]
async def omit(self, runt, node):
answ = self.omits.get(node.buid)
if answ is not None:
return answ
for filt in self.rules.get('filters'):
if await node.filter(runt, filt):
self.omits[node.buid] = True
return True
rules = self.rules['forms'].get(node.form.name)
if rules is None:
rules = self.rules['forms'].get('*')
if rules is None:
self.omits[node.buid] = False
return False
for filt in rules.get('filters', ()):
if await node.filter(runt, filt):
self.omits[node.buid] = True
return True
self.omits[node.buid] = False
return False
[docs]
async def pivots(self, runt, node, path, existing):
if self.rules.get('refs'):
for propname, ndef in node.getNodeRefs():
pivonode = await node.snap.getNodeByNdef(ndef)
if pivonode is None: # pragma: no cover
await asyncio.sleep(0)
continue
link = {'type': 'prop', 'prop': propname}
yield (pivonode, path.fork(pivonode, link), link)
for iden in existing:
buid = s_common.uhex(iden)
othr = await node.snap.getNodeByBuid(buid)
for propname, ndef in othr.getNodeRefs():
if ndef == node.ndef:
yield (othr, path, {'type': 'prop', 'prop': propname, 'reverse': True})
for pivq in self.rules.get('pivots'):
indx = 0
async for node, path in node.storm(runt, pivq):
yield node, path, {'type': 'rules', 'scope': 'global', 'index': indx}
indx += 1
scope = node.form.name
rules = self.rules['forms'].get(scope)
if rules is None:
scope = '*'
rules = self.rules['forms'].get(scope)
if rules is None:
return
for pivq in rules.get('pivots', ()):
indx = 0
async for n, p in node.storm(runt, pivq):
yield (n, p, {'type': 'rules', 'scope': scope, 'index': indx})
indx += 1
async def _edgefallback(self, runt, results, node):
async for buid01 in results:
await asyncio.sleep(0)
iden01 = s_common.ehex(buid01)
async for verb in node.iterEdgeVerbs(buid01):
await asyncio.sleep(0)
yield (iden01, {'type': 'edge', 'verb': verb})
# for existing nodes, we need to add n2 -> n1 edges in reverse
async for verb in runt.snap.iterEdgeVerbs(buid01, node.buid):
await asyncio.sleep(0)
yield (iden01, {'type': 'edge', 'verb': verb, 'reverse': True})
[docs]
async def run(self, runt, genr):
# NOTE: this function must agressively yield the ioloop
edgelimit = self.rules.get('edgelimit')
doedges = self.rules.get('edges')
degrees = self.rules.get('degrees')
maxsize = self.rules.get('maxsize')
existing = self.rules.get('existing')
filterinput = self.rules.get('filterinput')
yieldfiltered = self.rules.get('yieldfiltered')
self.user = runt.user
todo = collections.deque()
async with contextlib.AsyncExitStack() as stack:
core = runt.snap.core
done = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))
intodo = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))
results = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))
revpivs = await stack.enter_async_context(await s_spooled.Dict.anit(dirn=core.dirn, cell=core))
revedge = await stack.enter_async_context(await s_spooled.Dict.anit(dirn=core.dirn, cell=core))
edgecounts = await stack.enter_async_context(await s_spooled.Dict.anit(dirn=core.dirn, cell=core))
n1delayed = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))
n2delayed = await stack.enter_async_context(await s_spooled.Set.anit(dirn=core.dirn, cell=core))
# load the existing graph as already done
[await results.add(s_common.uhex(b)) for b in existing]
if doedges:
for b in existing:
ecnt = 0
cache = collections.defaultdict(list)
async for verb, n2iden in runt.snap.iterNodeEdgesN1(s_common.uhex(b)):
await asyncio.sleep(0)
if s_common.uhex(n2iden) in results:
continue
ecnt += 1
if ecnt > edgelimit:
break
cache[n2iden].append(verb)
if ecnt > edgelimit:
# don't let it into the cache.
# We've hit a potential death star and need to deal with it specially
await n1delayed.add(b)
continue
for n2iden, verbs in cache.items():
await asyncio.sleep(0)
if n2delayed.has(n2iden):
continue
if not revedge.has(n2iden):
await revedge.set(n2iden, {})
re = revedge.get(n2iden)
if b not in re:
re[b] = []
count = edgecounts.get(n2iden, defv=0) + len(verbs)
if count > edgelimit:
await n2delayed.add(n2iden)
revedge.pop(n2iden)
else:
await edgecounts.set(n2iden, count)
re[b] += verbs
await revedge.set(n2iden, re)
async def todogenr():
async for node, path in genr:
path.meta('graph:seed', True)
yield node, path, 0
while todo:
yield todo.popleft()
count = 0
async for node, path, dist in todogenr():
await asyncio.sleep(0)
buid = node.buid
if buid in done:
continue
count += 1
if count > maxsize:
await runt.snap.warn(f'Graph projection hit max size {maxsize}. Truncating results.')
break
await done.add(buid)
intodo.discard(buid)
omitted = False
if dist > 0 or filterinput:
omitted = await self.omit(runt, node)
if omitted and not yieldfiltered:
continue
# we must traverse the pivots for the node *regardless* of degrees
# due to needing to tie any leaf nodes to nodes that were already yielded
nodeiden = node.iden()
edges = list(revpivs.get(buid, defv=()))
async for pivn, pivp, pinfo in self.pivots(runt, node, path, existing):
await asyncio.sleep(0)
if results.has(pivn.buid):
edges.append((pivn.iden(), pinfo))
else:
pinfo['reverse'] = True
pivedges = revpivs.get(pivn.buid, defv=())
await revpivs.set(pivn.buid, pivedges + ((nodeiden, pinfo),))
# we dont pivot from omitted nodes
if omitted:
continue
# no need to pivot to nodes we already did
if pivn.buid in done:
continue
# no need to queue up todos that are already in todo
if pivn.buid in intodo:
continue
# no need to pivot to existing nodes
if pivn.iden() in existing:
continue
# do we have room to go another degree out?
if degrees is None or dist < degrees:
todo.append((pivn, pivp, dist + 1))
await intodo.add(pivn.buid)
if doedges:
ecnt = 0
cache = collections.defaultdict(list)
await results.add(buid)
# Try to lift and cache the potential edges for a node so that if we end up
# seeing n2 later, we won't have to go back and check for it
async for verb, n2iden in runt.snap.iterNodeEdgesN1(buid):
await asyncio.sleep(0)
if ecnt > edgelimit:
break
ecnt += 1
cache[n2iden].append(verb)
if ecnt > edgelimit:
# The current node in the pipeline has too many edges from it, so it's
# less prohibitive to just check against the graph
await n1delayed.add(nodeiden)
async for e in self._edgefallback(runt, results, node):
edges.append(e)
else:
for n2iden, verbs in cache.items():
await asyncio.sleep(0)
if n2delayed.has(n2iden):
continue
if not revedge.has(n2iden):
await revedge.set(n2iden, {})
re = revedge.get(n2iden)
if nodeiden not in re:
re[nodeiden] = []
count = edgecounts.get(n2iden, defv=0) + len(verbs)
if count > edgelimit:
await n2delayed.add(n2iden)
revedge.pop(n2iden)
else:
await edgecounts.set(n2iden, count)
re[nodeiden] += verbs
await revedge.set(n2iden, re)
if revedge.has(nodeiden):
for n2iden, verbs in revedge.get(nodeiden).items():
for verb in verbs:
await asyncio.sleep(0)
edges.append((n2iden, {'type': 'edge', 'verb': verb, 'reverse': True}))
if n2delayed.has(nodeiden):
async for buid01 in results:
async for verb in runt.snap.iterEdgeVerbs(buid01, buid):
await asyncio.sleep(0)
edges.append((s_common.ehex(buid01), {'type': 'edge', 'verb': verb, 'reverse': True}))
for n2iden, verbs in cache.items():
if s_common.uhex(n2iden) not in results:
continue
for v in verbs:
await asyncio.sleep(0)
edges.append((n2iden, {'type': 'edge', 'verb': v}))
async for n1iden in n1delayed:
n1buid = s_common.uhex(n1iden)
async for verb in runt.snap.iterEdgeVerbs(n1buid, buid):
await asyncio.sleep(0)
edges.append((n1iden, {'type': 'edge', 'verb': verb, 'reverse': True}))
path.metadata['edges'] = edges
yield node, path
[docs]
class Oper(AstNode):
pass
[docs]
class SubQuery(Oper):
def __init__(self, astinfo, kids=()):
Oper.__init__(self, astinfo, kids)
self.hasyield = False
self.hasretn = self.hasAstClass(Return)
self.text = ''
if len(kids):
self.text = kids[0].getAstText()
[docs]
def isRuntSafe(self, runt):
return True
[docs]
async def run(self, runt, genr):
subq = self.kids[0]
async for item in genr:
subp = None
async for subp in subq.run(runt, s_common.agen(item)):
if self.hasyield:
yield subp
# dup any path variables from the last yielded
if subp is not None:
item[1].vars.update(subp[1].vars)
yield item
[docs]
async def inline(self, runt, genr):
'''
Operate subquery as if it were inlined
'''
async for item in self.kids[0].run(runt, genr):
yield item
async def _compute(self, runt, path, limit):
retn = []
async with runt.getSubRuntime(self.kids[0]) as runt:
async for valunode, valupath in runt.execute():
retn.append(valunode)
if len(retn) > limit:
query = self.kids[0].text
mesg = f'Subquery used as a value yielded too many (>{limit}) nodes. {s_common.trimText(query)}'
raise self.addExcInfo(s_exc.BadTypeValu(mesg=mesg, text=query))
return retn
[docs]
async def compute(self, runt, path):
'''
Use subquery as a value. It is error if the subquery used in this way doesn't yield exactly one node or has a
return statement.
Its value is the primary property of the node yielded, or the returned value.
'''
try:
retn = await self._compute(runt, path, 1)
except s_stormctrl.StormReturn as e:
# a subquery assignment with a return; just use the returned value
return e.item
if retn == []:
return None
return retn[0]
[docs]
async def compute_array(self, runt, path):
'''
Use subquery as an array.
'''
try:
return await self._compute(runt, path, 128)
except s_stormctrl.StormReturn as e:
# a subquery assignment with a return; just use the returned value
return e.item
[docs]
class InitBlock(AstNode):
'''
An AST node that runs only once before yielding nodes.
Example:
Using a init block::
init {
// stuff here runs *once* before the first node yield (even if there are no nodes)
}
'''
[docs]
async def run(self, runt, genr):
subq = self.kids[0]
self.reqRuntSafe(runt, 'Init block query must be runtsafe')
once = False
async for item in genr:
if not once:
async for innr in subq.run(runt, s_common.agen()):
yield innr
once = True
yield item
if not once:
async for innr in subq.run(runt, s_common.agen()):
yield innr
[docs]
class EmptyBlock(AstNode):
'''
An AST node that only runs if there are not inbound nodes in the pipeline. It is
capable of yielding nodes into the pipeline.
Example:
Using an empty block::
empty {
// the pipeline is empty so this block will execute
}
[foo:bar=*]
empty {
// there is a node in the pipeline so this block will not run
}
'''
[docs]
async def run(self, runt, genr):
subq = self.kids[0]
self.reqRuntSafe(runt, 'Empty block query must be runtsafe')
empty = True
async for item in genr:
empty = False
yield item
if empty:
async for subn in subq.run(runt, s_common.agen()):
yield subn
[docs]
class FiniBlock(AstNode):
'''
An AST node that runs only once after all nodes have been consumed.
Example:
Using a fini block::
fini {
// stuff here runs *once* after the last node yield (even if there are no nodes)
}
Notes:
A fini block must be runtsafe.
'''
[docs]
async def run(self, runt, genr):
subq = self.kids[0]
self.reqRuntSafe(runt, 'Fini block query must be runtsafe')
async for item in genr:
yield item
async for innr in subq.run(runt, s_common.agen()):
yield innr
[docs]
class TryCatch(AstNode):
[docs]
async def run(self, runt, genr):
count = 0
async for item in genr:
count += 1
try:
agen = s_common.agen(item)
async for subi in self.kids[0].run(runt, agen):
yield subi
except s_exc.SynErr as e:
block = await self.getCatchBlock(e.errname, runt, path=item[1])
if block is None:
raise
await item[1].setVar(block.errvar(), await self.getErrValu(e))
agen = s_common.agen(item)
async for subi in block.run(runt, agen):
yield subi
if count == 0:
try:
async for item in self.kids[0].run(runt, genr):
yield item
except s_exc.SynErr as e:
block = await self.getCatchBlock(e.errname, runt)
if block is None:
raise
await runt.setVar(block.errvar(), await self.getErrValu(e))
async for item in block.run(runt, s_common.agen()):
yield item
[docs]
async def getErrValu(self, e):
mesg = e.errinfo.pop('mesg', 'No message given.')
info = await s_stormtypes.toprim(e.errinfo)
return {'name': e.errname, 'mesg': mesg, 'info': info}
[docs]
async def getCatchBlock(self, name, runt, path=None):
for catchblock in self.kids[1:]:
if await catchblock.catches(name, runt, path=path):
return catchblock
[docs]
class CatchBlock(AstNode):
[docs]
async def run(self, runt, genr):
async for item in self.kids[2].run(runt, genr):
yield item
[docs]
def getRuntVars(self, runt):
yield (self.errvar(), True)
yield from self.kids[2].getRuntVars(runt)
[docs]
def errvar(self):
return self.kids[1].value()
[docs]
async def catches(self, name, runt, path=None):
catchvalu = await self.kids[0].compute(runt, path)
catchvalu = await s_stormtypes.toprim(catchvalu)
if isinstance(catchvalu, str):
if catchvalu == '*':
return True
return catchvalu == name
if isinstance(catchvalu, (list, tuple)):
for catchname in catchvalu:
if catchname == name:
return True
return False
etyp = catchvalu.__class__.__name__
mesg = f'catch block must be a str or list object. {etyp} not allowed.'
raise self.kids[0].addExcInfo(s_exc.StormRuntimeError(mesg=mesg, type=etyp))
[docs]
class ForLoop(Oper):
[docs]
def getRuntVars(self, runt):
runtsafe = self.kids[1].isRuntSafe(runt)
if isinstance(self.kids[0], VarList):
for name in self.kids[0].value():
yield name, runtsafe
else:
yield self.kids[0].value(), runtsafe
yield from self.kids[2].getRuntVars(runt)
[docs]
async def run(self, runt, genr):
subq = self.kids[2]
name = self.kids[0].value()
node = None
async for node, path in genr:
# TODO: remove when storm is all objects
valu = await self.kids[1].compute(runt, path)
if isinstance(valu, s_stormtypes.Prim):
# returns an async genr instance...
valu = valu.iter()
if isinstance(valu, dict):
valu = list(valu.items())
if valu is None:
valu = ()
async with contextlib.aclosing(s_coro.agen(valu)) as agen:
async for item in agen:
if isinstance(name, (list, tuple)):
try:
numitems = len(item)
except TypeError:
mesg = f'Number of items to unpack does not match the number of variables: {s_common.trimText(repr(item))}'
exc = s_exc.StormVarListError(mesg=mesg, names=name)
raise self.kids[1].addExcInfo(exc)
if len(name) != numitems:
mesg = f'Number of items to unpack does not match the number of variables: {s_common.trimText(repr(item))}'
exc = s_exc.StormVarListError(mesg=mesg, names=name, numitems=numitems)
raise self.kids[1].addExcInfo(exc)
if isinstance(item, s_stormtypes.Prim):
item = await item.value()
for x, y in itertools.zip_longest(name, item):
await path.setVar(x, y)
await runt.setVar(x, y)
else:
# set both so inner subqueries have it in their runtime
await path.setVar(name, item)
await runt.setVar(name, item)
try:
# since it's possible to "multiply" the (node, path)
# we must make a clone of the path to prevent yield-then-use.
newg = s_common.agen((node, path.clone()))
async for item in subq.inline(runt, newg):
yield item
except s_stormctrl.StormBreak as e:
if e.item is not None:
yield e.item
break
except s_stormctrl.StormContinue as e:
if e.item is not None:
yield e.item
continue
finally:
# for loops must yield per item they iterate over
await asyncio.sleep(0)
# no nodes and a runt safe value should execute once
if node is None and self.kids[1].isRuntSafe(runt):
valu = await self.kids[1].compute(runt, None)
if isinstance(valu, s_stormtypes.Prim):
# returns an async genr instance...
valu = valu.iter()
if isinstance(valu, dict):
valu = list(valu.items())
if valu is None:
valu = ()
async with contextlib.aclosing(s_coro.agen(valu)) as agen:
async for item in agen:
if isinstance(name, (list, tuple)):
try:
numitems = len(item)
except TypeError:
mesg = f'Number of items to unpack does not match the number of variables: {s_common.trimText(repr(item))}'
exc = s_exc.StormVarListError(mesg=mesg, names=name)
raise self.kids[1].addExcInfo(exc)
if len(name) != numitems:
mesg = f'Number of items to unpack does not match the number of variables: {s_common.trimText(repr(item))}'
exc = s_exc.StormVarListError(mesg=mesg, names=name, numitems=numitems)
raise self.kids[1].addExcInfo(exc)
if isinstance(item, s_stormtypes.Prim):
item = await item.value()
for x, y in itertools.zip_longest(name, item):
await runt.setVar(x, y)
else:
await runt.setVar(name, item)
try:
async for jtem in subq.inline(runt, s_common.agen()):
yield jtem
except s_stormctrl.StormBreak as e:
if e.item is not None:
yield e.item
break
except s_stormctrl.StormContinue as e:
if e.item is not None:
yield e.item
continue
finally:
# for loops must yield per item they iterate over
await asyncio.sleep(0)
[docs]
class WhileLoop(Oper):
[docs]
async def run(self, runt, genr):
subq = self.kids[1]
node = None
async for node, path in genr:
while await tobool(await self.kids[0].compute(runt, path)):
try:
newg = s_common.agen((node, path))
async for item in subq.inline(runt, newg):
yield item
await asyncio.sleep(0)
except s_stormctrl.StormBreak as e:
if e.item is not None:
yield e.item
break
except s_stormctrl.StormContinue as e:
if e.item is not None:
yield e.item
continue
finally:
# while loops must yield each time they loop
await asyncio.sleep(0)
# no nodes and a runt safe value should execute once
if node is None and self.kids[0].isRuntSafe(runt):
while await tobool(await self.kids[0].compute(runt, None)):
try:
async for jtem in subq.inline(runt, s_common.agen()):
yield jtem
await asyncio.sleep(0)
except s_stormctrl.StormBreak as e:
if e.item is not None:
yield e.item
break
except s_stormctrl.StormContinue as e:
if e.item is not None:
yield e.item
continue
finally:
# while loops must yield each time they loop
await asyncio.sleep(0)
[docs]
async def pullone(genr):
gotone = None
async for gotone in genr:
break
async def pullgenr():
if gotone is None:
return
yield gotone
async for item in genr:
yield item
return pullgenr(), gotone is None
[docs]
class CmdOper(Oper):
[docs]
async def run(self, runt, genr):
name = self.kids[0].value()
ctor = runt.snap.core.getStormCmd(name)
if ctor is None:
mesg = f'Storm command ({name}) not found.'
exc = s_exc.NoSuchName(name=name, mesg=mesg)
raise self.kids[0].addExcInfo(exc)
runtsafe = self.kids[1].isRuntSafe(runt)
scmd = ctor(runt, runtsafe)
if runt.readonly and not scmd.isReadOnly():
mesg = f'Command ({name}) is not marked safe for readonly use.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
async def genx():
async for node, path in genr:
argv = await self.kids[1].compute(runt, path)
if not await scmd.setArgv(argv):
raise s_stormctrl.StormExit()
yield node, path
# must pull through the genr to get opts set
# ( many commands expect self.opts is set at run() )
genr, empty = await pullone(genx())
try:
if runtsafe:
argv = await self.kids[1].compute(runt, None)
if not await scmd.setArgv(argv):
raise s_stormctrl.StormExit()
if runtsafe or not empty:
async with contextlib.aclosing(scmd.execStormCmd(runt, genr)) as agen:
async for item in agen:
yield item
finally:
await genr.aclose()
[docs]
class SetVarOper(Oper):
[docs]
async def run(self, runt, genr):
name = self.kids[0].value()
vkid = self.kids[1]
count = 0
async for node, path in genr:
count += 1
valu = await vkid.compute(runt, path)
if valu is undef:
await runt.popVar(name)
# TODO detect which to update here
await path.popVar(name)
else:
await runt.setVar(name, valu)
# TODO detect which to update here
await path.setVar(name, valu)
yield node, path
if count == 0 and vkid.isRuntSafe(runt):
valu = await vkid.compute(runt, None)
if valu is undef:
await runt.popVar(name)
else:
await runt.setVar(name, valu)
[docs]
def getRuntVars(self, runt):
name = self.kids[0].value()
if runt.runtvars.get(name) is None and self.kids[1].hasVarName(name):
exc = s_exc.NoSuchVar(mesg=f'Missing variable: {name}', name=name)
raise self.kids[0].addExcInfo(exc)
yield name, self.kids[1].isRuntSafe(runt)
for k in self.kids:
yield from k.getRuntVars(runt)
[docs]
class SetItemOper(Oper):
'''
$foo.bar = baz
$foo."bar baz" = faz
$foo.$bar = baz
'''
[docs]
async def run(self, runt, genr):
count = 0
async for node, path in genr:
count += 1
item = s_stormtypes.fromprim(await self.kids[0].compute(runt, path), basetypes=False)
if runt.readonly and not getattr(item.setitem, '_storm_readonly', False):
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.kids[0].addExcInfo(s_exc.IsReadOnly(mesg=mesg))
name = await self.kids[1].compute(runt, path)
valu = await self.kids[2].compute(runt, path)
# TODO: ditch this when storm goes full heavy object
with s_scope.enter({'runt': runt}):
await item.setitem(name, valu)
yield node, path
if count == 0 and self.isRuntSafe(runt):
item = s_stormtypes.fromprim(await self.kids[0].compute(runt, None), basetypes=False)
name = await self.kids[1].compute(runt, None)
valu = await self.kids[2].compute(runt, None)
if runt.readonly and not getattr(item.setitem, '_storm_readonly', False):
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.kids[0].addExcInfo(s_exc.IsReadOnly(mesg=mesg))
# TODO: ditch this when storm goes full heavy object
with s_scope.enter({'runt': runt}):
await item.setitem(name, valu)
[docs]
class VarListSetOper(Oper):
[docs]
async def run(self, runt, genr):
names = self.kids[0].value()
vkid = self.kids[1]
anynodes = False
async for node, path in genr:
anynodes = True
item = await vkid.compute(runt, path)
item = [i async for i in s_stormtypes.toiter(item)]
if len(item) < len(names):
mesg = f'Attempting to assign more items than we have variables to assign to: {s_common.trimText(repr(item))}'
exc = s_exc.StormVarListError(mesg=mesg, names=names, numitems=len(item))
raise self.kids[0].addExcInfo(exc)
for name, valu in zip(names, item):
await runt.setVar(name, valu)
await path.setVar(name, valu)
yield node, path
if not anynodes and vkid.isRuntSafe(runt):
item = await vkid.compute(runt, None)
item = [i async for i in s_stormtypes.toiter(item)]
if len(item) < len(names):
mesg = f'Attempting to assign more items than we have variables to assign to: {s_common.trimText(repr(item))}'
exc = s_exc.StormVarListError(mesg=mesg, names=names, numitems=len(item))
raise self.kids[0].addExcInfo(exc)
for name, valu in zip(names, item):
await runt.setVar(name, valu)
async for item in genr:
yield item
return
[docs]
def getRuntVars(self, runt):
runtsafe = self.kids[1].isRuntSafe(runt)
for name in self.kids[0].value():
yield name, runtsafe
[docs]
class VarEvalOper(Oper):
'''
Facilitate a stand-alone operator that evaluates a var.
$foo.bar("baz")
'''
[docs]
async def run(self, runt, genr):
anynodes = False
async for node, path in genr:
anynodes = True
await self.kids[0].compute(runt, path)
yield node, path
if not anynodes and self.isRuntSafe(runt):
valu = await self.kids[0].compute(runt, None)
if isinstance(valu, types.AsyncGeneratorType):
async for item in valu:
await asyncio.sleep(0)
[docs]
class SwitchCase(Oper):
[docs]
def prepare(self):
self.cases = {}
self.defcase = None
for cent in self.kids[1:]:
*vals, subq = cent.kids
if cent.defcase:
self.defcase = subq
continue
for valu in vals:
self.cases[valu.value()] = subq
[docs]
async def run(self, runt, genr):
count = 0
async for node, path in genr:
count += 1
varv = await self.kids[0].compute(runt, path)
# TODO: when we have var type system, do type-aware comparison
subq = self.cases.get(await s_stormtypes.tostr(varv))
if subq is None and self.defcase is not None:
subq = self.defcase
if subq is None:
yield (node, path)
else:
async for item in subq.inline(runt, s_common.agen((node, path))):
yield item
if count == 0 and self.kids[0].isRuntSafe(runt):
# no nodes and a runt safe value should execute
varv = await self.kids[0].compute(runt, None)
subq = self.cases.get(await s_stormtypes.tostr(varv))
if subq is None and self.defcase is not None:
subq = self.defcase
if subq is None:
return
async for item in subq.inline(runt, s_common.agen()):
yield item
[docs]
class CaseEntry(AstNode):
def __init__(self, astinfo, kids=(), defcase=False):
AstNode.__init__(self, astinfo, kids=kids)
self.defcase = defcase
[docs]
class LiftOper(Oper):
def __init__(self, astinfo, kids=()):
Oper.__init__(self, astinfo, kids=kids)
self.reverse = False
[docs]
def reverseLift(self, astinfo):
self.astinfo = astinfo
self.reverse = True
[docs]
def getPivNames(self, runt, prop, pivs):
pivnames = []
typename = prop.type.name
for piv in pivs:
pivprop = runt.model.reqProp(f'{typename}:{piv}', extra=self.kids[0].addExcInfo)
pivnames.append(pivprop.full)
typename = pivprop.type.name
return pivnames
[docs]
async def pivlift(self, runt, props, pivnames, genr):
async def pivvals(prop, pivgenr):
async for node in pivgenr:
async for pivo in runt.snap.nodesByPropValu(prop, '=', node.ndef[1], reverse=self.reverse):
yield pivo
for pivname in pivnames[-2::-1]:
genr = pivvals(pivname, genr)
async for node in genr:
valu = node.ndef[1]
for prop in props:
async for node in runt.snap.nodesByPropValu(prop.full, '=', valu, reverse=self.reverse):
yield node
[docs]
async def run(self, runt, genr):
if self.isRuntSafe(runt):
# runtime safe lift operation
async for item in genr:
yield item
async for node in self.lift(runt, None):
yield node, runt.initPath(node)
return
link = {'type': 'runtime'}
async for node, path in genr:
yield node, path
async for subn in self.lift(runt, path):
yield subn, path.fork(subn, link)
[docs]
async def lift(self, runt, path): # pragma: no cover
raise NotImplementedError('Must define lift(runt, path)')
[docs]
class YieldValu(Oper):
[docs]
async def run(self, runt, genr):
node = None
async for node, path in genr:
valu = await self.kids[0].compute(runt, path)
async with contextlib.aclosing(self.yieldFromValu(runt, valu)) as agen:
async for subn in agen:
yield subn, runt.initPath(subn)
yield node, path
if node is None and self.kids[0].isRuntSafe(runt):
valu = await self.kids[0].compute(runt, None)
async with contextlib.aclosing(self.yieldFromValu(runt, valu)) as agen:
async for subn in agen:
yield subn, runt.initPath(subn)
[docs]
async def yieldFromValu(self, runt, valu):
viewiden = runt.snap.view.iden
# there is nothing in None... ;)
if valu is None:
return
# a little DWIM on what we get back...
# ( most common case will be stormtypes libs agenr -> iden|buid )
# buid list -> nodes
if isinstance(valu, bytes):
node = await runt.snap.getNodeByBuid(valu)
if node is not None:
yield node
return
# iden list -> nodes
if isinstance(valu, str):
try:
buid = s_common.uhex(valu)
except binascii.Error:
mesg = 'Yield string must be iden in hexdecimal. Got: %r' % (valu,)
raise self.kids[0].addExcInfo(s_exc.BadLiftValu(mesg=mesg))
node = await runt.snap.getNodeByBuid(buid)
if node is not None:
yield node
return
if isinstance(valu, types.AsyncGeneratorType):
try:
async for item in valu:
async for node in self.yieldFromValu(runt, item):
yield node
finally:
await valu.aclose()
return
if isinstance(valu, types.GeneratorType):
try:
for item in valu:
async for node in self.yieldFromValu(runt, item):
yield node
finally:
valu.close()
return
if isinstance(valu, (list, tuple, set)):
for item in valu:
async for node in self.yieldFromValu(runt, item):
yield node
return
if isinstance(valu, s_stormtypes.Node):
valu = valu.valu
if valu.snap.view.iden != viewiden:
mesg = f'Node is not from the current view. Node {valu.iden()} is from {valu.snap.view.iden} expected {viewiden}'
raise s_exc.BadLiftValu(mesg=mesg)
yield valu
return
if isinstance(valu, s_node.Node):
if valu.snap.view.iden != viewiden:
mesg = f'Node is not from the current view. Node {valu.iden()} is from {valu.snap.view.iden} expected {viewiden}'
raise s_exc.BadLiftValu(mesg=mesg)
yield valu
return
if isinstance(valu, (s_stormtypes.List, s_stormtypes.Set)):
for item in valu.valu:
async for node in self.yieldFromValu(runt, item):
yield node
return
if isinstance(valu, s_stormtypes.Prim):
async with contextlib.aclosing(valu.nodes()) as genr:
async for node in genr:
if node.snap.view.iden != viewiden:
mesg = f'Node is not from the current view. Node {node.iden()} is from {node.snap.view.iden} expected {viewiden}'
raise s_exc.BadLiftValu(mesg=mesg)
yield node
return
[docs]
class LiftTag(LiftOper):
[docs]
async def lift(self, runt, path):
tag = await self.kids[0].compute(runt, path)
if len(self.kids) == 3:
cmpr = await self.kids[1].compute(runt, path)
valu = await toprim(await self.kids[2].compute(runt, path))
async for node in runt.snap.nodesByTagValu(tag, cmpr, valu, reverse=self.reverse):
yield node
return
async for node in runt.snap.nodesByTag(tag, reverse=self.reverse):
yield node
[docs]
class LiftByArray(LiftOper):
'''
:prop*[range=(200, 400)]
'''
[docs]
async def lift(self, runt, path):
name = await self.kids[0].compute(runt, path)
cmpr = await self.kids[1].compute(runt, path)
valu = await s_stormtypes.tostor(await self.kids[2].compute(runt, path))
pivs = None
if name.find('::') != -1:
parts = name.split('::')
name, pivs = parts[0], parts[1:]
if (prop := runt.model.props.get(name)) is not None:
props = (prop,)
else:
proplist = runt.model.ifaceprops.get(name)
if proplist is None:
raise self.kids[0].addExcInfo(s_exc.NoSuchProp.init(name))
props = []
for propname in proplist:
props.append(runt.model.props.get(propname))
try:
if pivs is not None:
pivnames = self.getPivNames(runt, props[0], pivs)
genr = runt.snap.nodesByPropArray(pivnames[-1], cmpr, valu, reverse=self.reverse)
async for node in self.pivlift(runt, props, pivnames, genr):
yield node
return
if len(props) == 1:
async for node in runt.snap.nodesByPropArray(name, cmpr, valu, reverse=self.reverse):
yield node
return
relname = props[0].name
def cmprkey(node):
return node.props.get(relname)
genrs = []
for prop in props:
genrs.append(runt.snap.nodesByPropArray(prop.full, cmpr, valu, reverse=self.reverse))
async for node in s_common.merggenr2(genrs, cmprkey, reverse=self.reverse):
yield node
except s_exc.BadTypeValu as e:
raise self.kids[2].addExcInfo(e)
except s_exc.SynErr as e:
raise self.addExcInfo(e)
[docs]
class LiftTagProp(LiftOper):
'''
#foo.bar:baz [ = x ]
'''
[docs]
async def lift(self, runt, path):
tag, prop = await self.kids[0].compute(runt, path)
if len(self.kids) == 3:
cmpr = await self.kids[1].compute(runt, path)
valu = await s_stormtypes.tostor(await self.kids[2].compute(runt, path))
async for node in runt.snap.nodesByTagPropValu(None, tag, prop, cmpr, valu, reverse=self.reverse):
yield node
return
async for node in runt.snap.nodesByTagProp(None, tag, prop, reverse=self.reverse):
yield node
[docs]
class LiftTagTag(LiftOper):
'''
##foo.bar
'''
[docs]
async def lift(self, runt, path):
tagname = await self.kids[0].compute(runt, path)
node = await runt.snap.getNodeByNdef(('syn:tag', tagname))
if node is None:
return
# only apply the lift valu to the top level tag of tags, not to the sub tags
if len(self.kids) == 3:
cmpr = await self.kids[1].compute(runt, path)
valu = await toprim(await self.kids[2].compute(runt, path))
genr = runt.snap.nodesByTagValu(tagname, cmpr, valu, reverse=self.reverse)
else:
genr = runt.snap.nodesByTag(tagname, reverse=self.reverse)
done = set([tagname])
todo = collections.deque([genr])
while todo:
genr = todo.popleft()
async for node in genr:
if node.form.name == 'syn:tag':
tagname = node.ndef[1]
if tagname not in done:
done.add(tagname)
todo.append(runt.snap.nodesByTag(tagname, reverse=self.reverse))
continue
yield node
[docs]
class LiftProp(LiftOper):
[docs]
async def lift(self, runt, path):
assert len(self.kids) == 1
name = await tostr(await self.kids[0].compute(runt, path))
prop = runt.model.props.get(name)
if prop is not None:
async for node in self.proplift(prop, runt, path):
yield node
return
proplist = runt.model.reqPropsByLook(name, self.kids[0].addExcInfo)
props = []
for propname in proplist:
props.append(runt.model.props.get(propname))
if len(props) == 1 or props[0].isform:
for prop in props:
async for node in self.proplift(prop, runt, path):
yield node
return
relname = props[0].name
def cmprkey(node):
return node.props.get(relname)
genrs = []
for prop in props:
genrs.append(self.proplift(prop, runt, path))
async for node in s_common.merggenr2(genrs, cmprkey, reverse=self.reverse):
yield node
[docs]
async def proplift(self, prop, runt, path):
# check if we can optimize a form lift
if prop.isform:
async for hint in self.getRightHints(runt, path):
if hint[0] == 'tag':
tagname = hint[1].get('name')
async for node in runt.snap.nodesByTag(tagname, form=prop.full, reverse=self.reverse):
yield node
return
if hint[0] == 'relprop':
relpropname = hint[1].get('name')
isuniv = hint[1].get('univ')
if isuniv:
fullname = ''.join([prop.full, relpropname])
else:
fullname = ':'.join([prop.full, relpropname])
prop = runt.model.prop(fullname)
if prop is None:
return
cmpr = hint[1].get('cmpr')
valu = hint[1].get('valu')
if cmpr is not None and valu is not None:
try:
# try lifting by valu but no guarantee a cmpr is available
async for node in runt.snap.nodesByPropValu(fullname, cmpr, valu, reverse=self.reverse):
yield node
return
except asyncio.CancelledError: # pragma: no cover
raise
except:
pass
async for node in runt.snap.nodesByProp(fullname, reverse=self.reverse):
yield node
return
async for node in runt.snap.nodesByProp(prop.full, reverse=self.reverse):
yield node
[docs]
async def getRightHints(self, runt, path):
for oper in self.iterright():
# we can skip other lifts but that's it...
if isinstance(oper, LiftOper):
continue
if isinstance(oper, FiltOper):
for hint in await oper.getLiftHints(runt, path):
yield hint
continue
return
[docs]
class LiftPropBy(LiftOper):
[docs]
async def lift(self, runt, path):
name = await self.kids[0].compute(runt, path)
cmpr = await self.kids[1].compute(runt, path)
valu = await self.kids[2].compute(runt, path)
if not isinstance(valu, s_node.Node):
valu = await s_stormtypes.tostor(valu)
pivs = None
if name.find('::') != -1:
parts = name.split('::')
name, pivs = parts[0], parts[1:]
prop = runt.model.props.get(name)
if prop is not None:
props = (prop,)
else:
proplist = runt.model.ifaceprops.get(name)
if proplist is None:
raise self.kids[0].addExcInfo(s_exc.NoSuchProp.init(name))
props = []
for propname in proplist:
props.append(runt.model.props.get(propname))
try:
if pivs is not None:
pivnames = self.getPivNames(runt, props[0], pivs)
genr = runt.snap.nodesByPropValu(pivnames[-1], cmpr, valu, reverse=self.reverse)
async for node in self.pivlift(runt, props, pivnames, genr):
yield node
return
if len(props) == 1:
prop = props[0]
async for node in runt.snap.nodesByPropValu(prop.full, cmpr, valu, reverse=self.reverse):
yield node
return
relname = props[0].name
def cmprkey(node):
return node.props.get(relname)
genrs = []
for prop in props:
genrs.append(runt.snap.nodesByPropValu(prop.full, cmpr, valu, reverse=self.reverse))
async for node in s_common.merggenr2(genrs, cmprkey, reverse=self.reverse):
yield node
except s_exc.BadTypeValu as e:
raise self.kids[2].addExcInfo(e)
except s_exc.SynErr as e:
raise self.addExcInfo(e)
[docs]
class PivotOper(Oper):
def __init__(self, astinfo, kids=(), isjoin=False):
Oper.__init__(self, astinfo, kids=kids)
self.isjoin = isjoin
[docs]
def repr(self):
return f'{self.__class__.__name__}: {self.kids}, isjoin={self.isjoin}'
def __repr__(self):
return self.repr()
[docs]
class RawPivot(PivotOper):
'''
-> { <varsfrompath> }
'''
[docs]
async def run(self, runt, genr):
query = self.kids[0]
async for node, path in genr:
async with runt.getSubRuntime(query) as subr:
async for node, path in subr.execute():
yield node, path
[docs]
class PivotOut(PivotOper):
'''
-> *
'''
[docs]
async def run(self, runt, genr):
async for node, path in genr:
if self.isjoin:
yield node, path
async for item in self.getPivsOut(runt, node, path):
yield item
[docs]
async def getPivsOut(self, runt, node, path):
# <syn:tag> -> * is "from tags to nodes with tags"
if node.form.name == 'syn:tag':
link = {'type': 'tag', 'tag': node.ndef[1], 'reverse': True}
async for pivo in runt.snap.nodesByTag(node.ndef[1]):
yield pivo, path.fork(pivo, link)
return
if isinstance(node.form.type, s_types.Edge):
n2def = node.get('n2')
pivo = await runt.snap.getNodeByNdef(n2def)
if pivo is None: # pragma: no cover
logger.warning(f'Missing node corresponding to ndef {n2def} on edge')
return
yield pivo, path.fork(pivo, {'type': 'prop', 'prop': 'n2'})
return
for name, prop in node.form.props.items():
valu = node.get(name)
if valu is None:
continue
link = {'type': 'prop', 'prop': prop.name}
# if the outbound prop is an ndef...
if isinstance(prop.type, s_types.Ndef):
pivo = await runt.snap.getNodeByNdef(valu)
if pivo is None:
continue
yield pivo, path.fork(pivo, link)
continue
if isinstance(prop.type, s_types.Array):
if isinstance(prop.type.arraytype, s_types.Ndef):
for item in valu:
if (pivo := await runt.snap.getNodeByNdef(item)) is not None:
yield pivo, path.fork(pivo, link)
continue
typename = prop.type.opts.get('type')
if runt.model.forms.get(typename) is not None:
for item in valu:
async for pivo in runt.snap.nodesByPropValu(typename, '=', item, norm=False):
yield pivo, path.fork(pivo, link)
form = runt.model.forms.get(prop.type.name)
if form is None:
continue
if prop.isrunt:
async for pivo in runt.snap.nodesByPropValu(form.name, '=', valu):
yield pivo, path.fork(pivo, link)
continue
pivo = await runt.snap.getNodeByNdef((form.name, valu))
if pivo is None: # pragma: no cover
continue
# avoid self references
if pivo.buid == node.buid:
continue
yield pivo, path.fork(pivo, link)
[docs]
class N1WalkNPivo(PivotOut):
[docs]
async def run(self, runt, genr):
async for node, path in genr:
if self.isjoin:
yield node, path
async for item in self.getPivsOut(runt, node, path):
yield item
async for (verb, iden) in node.iterEdgesN1():
wnode = await runt.snap.getNodeByBuid(s_common.uhex(iden))
if wnode is not None:
yield wnode, path.fork(wnode, {'type': 'edge', 'verb': verb})
[docs]
class PivotIn(PivotOper):
'''
<- *
'''
[docs]
async def run(self, runt, genr):
async for node, path in genr:
if self.isjoin:
yield node, path
async for item in self.getPivsIn(runt, node, path):
yield item
[docs]
async def getPivsIn(self, runt, node, path):
# if it's a graph edge, use :n1
if isinstance(node.form.type, s_types.Edge):
ndef = node.get('n1')
pivo = await runt.snap.getNodeByNdef(ndef)
if pivo is not None:
yield pivo, path.fork(pivo, {'type': 'prop', 'prop': 'n1', 'reverse': True})
return
name, valu = node.ndef
for prop in runt.model.getPropsByType(name):
link = {'type': 'prop', 'prop': prop.name, 'reverse': True}
norm = node.form.typehash is not prop.typehash
async for pivo in runt.snap.nodesByPropValu(prop.full, '=', valu, norm=norm):
yield pivo, path.fork(pivo, link)
for prop in runt.model.getArrayPropsByType(name):
norm = node.form.typehash is not prop.arraytypehash
link = {'type': 'prop', 'prop': prop.name, 'reverse': True}
async for pivo in runt.snap.nodesByPropArray(prop.full, '=', valu, norm=norm):
yield pivo, path.fork(pivo, link)
async for refsbuid, prop in runt.snap.getNdefRefs(node.buid, props=True):
pivo = await runt.snap.getNodeByBuid(refsbuid)
yield pivo, path.fork(pivo, {'type': 'prop', 'prop': prop, 'reverse': True})
[docs]
class N2WalkNPivo(PivotIn):
[docs]
async def run(self, runt, genr):
async for node, path in genr:
if self.isjoin:
yield node, path
async for item in self.getPivsIn(runt, node, path):
yield item
async for (verb, iden) in node.iterEdgesN2():
wnode = await runt.snap.getNodeByBuid(s_common.uhex(iden))
if wnode is not None:
yield wnode, path.fork(wnode, {'type': 'edge', 'verb': verb, 'reverse': True})
[docs]
class PivotInFrom(PivotOper):
'''
<- foo:edge
'''
[docs]
async def run(self, runt, genr):
name = self.kids[0].value()
form = runt.model.forms.get(name)
if form is None:
raise self.kids[0].addExcInfo(s_exc.NoSuchForm.init(name))
# <- edge
if isinstance(form.type, s_types.Edge):
full = form.name + ':n2'
link = {'type': 'prop', 'prop': 'n2', 'reverse': True}
async for node, path in genr:
if self.isjoin:
yield node, path
async for pivo in runt.snap.nodesByPropValu(full, '=', node.ndef, norm=False):
yield pivo, path.fork(pivo, link)
return
# edge <- form
link = {'type': 'prop', 'prop': 'n1', 'reverse': True}
async for node, path in genr:
if self.isjoin:
yield node, path
if not isinstance(node.form.type, s_types.Edge):
mesg = f'Pivot in from a specific form cannot be used with nodes of type {node.form.type.name}'
raise self.addExcInfo(s_exc.StormRuntimeError(mesg=mesg, name=node.form.type.name))
# dont bother traversing edges to the wrong form
if node.get('n1:form') != form.name:
continue
n1def = node.get('n1')
pivo = await runt.snap.getNodeByNdef(n1def)
if pivo is None:
continue
yield pivo, path.fork(pivo, link)
[docs]
class PropPivotOut(PivotOper):
'''
:prop -> *
'''
[docs]
async def run(self, runt, genr):
warned = False
async for node, path in genr:
if self.isjoin:
yield node, path
name = await self.kids[0].compute(runt, path)
prop = node.form.props.get(name)
if prop is None:
# all filters must sleep
await asyncio.sleep(0)
continue
valu = node.get(name)
if valu is None:
# all filters must sleep
await asyncio.sleep(0)
continue
link = {'type': 'prop', 'prop': prop.name}
if prop.type.isarray:
if isinstance(prop.type.arraytype, s_types.Ndef):
for item in valu:
if (pivo := await runt.snap.getNodeByNdef(item)) is not None:
yield pivo, path.fork(pivo, link)
continue
fname = prop.type.arraytype.name
if runt.model.forms.get(fname) is None:
if not warned:
mesg = f'The source property "{name}" array type "{fname}" is not a form. Cannot pivot.'
await runt.snap.warn(mesg, log=False)
warned = True
continue
for item in valu:
async for pivo in runt.snap.nodesByPropValu(fname, '=', item, norm=False):
yield pivo, path.fork(pivo, link)
continue
# ndef pivot out syntax...
# :ndef -> *
if isinstance(prop.type, s_types.Ndef):
pivo = await runt.snap.getNodeByNdef(valu)
if pivo is None:
logger.warning(f'Missing node corresponding to ndef {valu}')
continue
yield pivo, path.fork(pivo, link)
continue
# :prop -> *
fname = prop.type.name
if prop.modl.form(fname) is None:
if warned is False:
await runt.snap.warn(f'The source property "{name}" type "{fname}" is not a form. Cannot pivot.',
log=False)
warned = True
continue
ndef = (fname, valu)
pivo = await runt.snap.getNodeByNdef(ndef)
# A node explicitly deleted in the graph or missing from a underlying layer
# could cause this lift to return None.
if pivo:
yield pivo, path.fork(pivo, link)
[docs]
class PropPivot(PivotOper):
'''
:foo -> bar:foo
'''
[docs]
def pivogenr(self, runt, prop):
async def pgenr(node, srcprop, valu, strict=True):
link = {'type': 'prop', 'prop': srcprop.name}
if not prop.isform:
link['dest'] = prop.full
# pivoting from an array prop to a non-array prop needs an extra loop
if srcprop.type.isarray and not prop.type.isarray:
if isinstance(srcprop.type.arraytype, s_types.Ndef) and prop.isform:
for aval in valu:
if aval[0] != prop.form.name:
continue
if (pivo := await runt.snap.getNodeByNdef(aval)) is not None:
yield pivo, link
return
norm = srcprop.arraytypehash is not prop.typehash
for arrayval in valu:
async for pivo in runt.snap.nodesByPropValu(prop.full, '=', arrayval, norm=norm):
yield pivo, link
return
if isinstance(srcprop.type, s_types.Ndef) and prop.isform:
if valu[0] != prop.form.name:
return
pivo = await runt.snap.getNodeByNdef(valu)
if pivo is None:
await runt.snap.warn(f'Missing node corresponding to ndef {valu}', log=False, ndef=valu)
return
yield pivo, link
return
if prop.type.isarray and not srcprop.type.isarray:
norm = prop.arraytypehash is not srcprop.typehash
genr = runt.snap.nodesByPropArray(prop.full, '=', valu, norm=norm)
else:
norm = prop.typehash is not srcprop.typehash
genr = runt.snap.nodesByPropValu(prop.full, '=', valu, norm=norm)
async for pivo in genr:
yield pivo, link
return pgenr
[docs]
def buildgenr(self, runt, name):
if isinstance(name, list) or (prop := runt.model.props.get(name)) is None:
if isinstance(name, list):
proplist = name
else:
proplist = runt.model.ifaceprops.get(name)
if proplist is None:
raise self.kids[0].addExcInfo(s_exc.NoSuchProp.init(name))
pgenrs = []
for propname in proplist:
prop = runt.model.props.get(propname)
if prop is None:
raise self.kids[0].addExcInfo(s_exc.NoSuchProp.init(propname))
pgenrs.append(self.pivogenr(runt, prop))
async def listpivot(node, srcprop, valu):
for pgenr in pgenrs:
async for pivo in pgenr(node, srcprop, valu, strict=False):
yield pivo
return listpivot
return self.pivogenr(runt, prop)
[docs]
async def run(self, runt, genr):
pgenr = None
warned = False
async for node, path in genr:
if pgenr is None or not self.kids[1].isconst:
name = await self.kids[1].compute(runt, None)
pgenr = self.buildgenr(runt, name)
if self.isjoin:
yield node, path
srcprop, valu = await self.kids[0].getPropAndValu(runt, path)
if valu is None:
# all filters must sleep
await asyncio.sleep(0)
continue
try:
async for pivo, link in pgenr(node, srcprop, valu):
yield pivo, path.fork(pivo, link)
except (s_exc.BadTypeValu, s_exc.BadLiftValu) as e:
if not warned:
logger.warning(f'Caught error during pivot: {e.items()}')
warned = True
items = e.items()
mesg = items.pop('mesg', '')
mesg = ': '.join((f'{e.__class__.__qualname__} [{repr(valu)}] during pivot', mesg))
await runt.snap.warn(mesg, log=False, **items)
[docs]
class Value(AstNode):
'''
The base class for all values and value expressions.
'''
def __init__(self, astinfo, kids=()):
AstNode.__init__(self, astinfo, kids=kids)
def __repr__(self):
return self.repr()
[docs]
def isRuntSafe(self, runt):
return all(k.isRuntSafe(runt) for k in self.kids)
[docs]
async def compute(self, runt, path): # pragma: no cover
raise self.addExcInfo(s_exc.NoSuchImpl(name=f'{self.__class__.__name__}.compute()'))
[docs]
async def getLiftHints(self, runt, path):
return []
[docs]
async def getCondEval(self, runt):
'''
Return a function that may be used to evaluate the boolean truth
of the value expression using a runtime and optional node path.
'''
async def cond(node, path):
return await tobool(await self.compute(runt, path))
return cond
[docs]
class Cond(Value):
'''
A condition that is evaluated to filter nodes.
'''
# Keeping the distinction of Cond as a subclass of Value
# due to the fact that Cond instances may always presume
# they are being evaluated per node.
[docs]
class SubqCond(Cond):
def __init__(self, astinfo, kids=()):
Cond.__init__(self, astinfo, kids=kids)
self.funcs = {
'=': self._subqCondEq,
'>': self._subqCondGt,
'<': self._subqCondLt,
'>=': self._subqCondGe,
'<=': self._subqCondLe,
'!=': self._subqCondNe,
}
async def _runSubQuery(self, runt, node, path):
size = 1
genr = s_common.agen((node, path))
async for item in self.kids[0].run(runt, genr):
yield size, item
size += 1
def _subqCondEq(self, runt):
async def cond(node, path):
size = 0
item = None
valu = s_stormtypes.intify(await self.kids[2].compute(runt, path))
async for size, item in self._runSubQuery(runt, node, path):
if size > valu:
path.vars.update(item[1].vars)
return False
if item:
path.vars.update(item[1].vars)
return size == valu
return cond
def _subqCondGt(self, runt):
async def cond(node, path):
item = None
valu = s_stormtypes.intify(await self.kids[2].compute(runt, path))
async for size, item in self._runSubQuery(runt, node, path):
if size > valu:
path.vars.update(item[1].vars)
return True
if item:
path.vars.update(item[1].vars)
return False
return cond
def _subqCondLt(self, runt):
async def cond(node, path):
item = None
valu = s_stormtypes.intify(await self.kids[2].compute(runt, path))
async for size, item in self._runSubQuery(runt, node, path):
if size >= valu:
path.vars.update(item[1].vars)
return False
if item:
path.vars.update(item[1].vars)
return True
return cond
def _subqCondGe(self, runt):
async def cond(node, path):
item = None
valu = s_stormtypes.intify(await self.kids[2].compute(runt, path))
async for size, item in self._runSubQuery(runt, node, path):
if size >= valu:
path.vars.update(item[1].vars)
return True
if item:
path.vars.update(item[1].vars)
return False
return cond
def _subqCondLe(self, runt):
async def cond(node, path):
item = None
valu = s_stormtypes.intify(await self.kids[2].compute(runt, path))
async for size, item in self._runSubQuery(runt, node, path):
if size > valu:
path.vars.update(item[1].vars)
return False
if item:
path.vars.update(item[1].vars)
return True
return cond
def _subqCondNe(self, runt):
async def cond(node, path):
size = 0
item = None
valu = s_stormtypes.intify(await self.kids[2].compute(runt, path))
async for size, item in self._runSubQuery(runt, node, path):
if size > valu:
path.vars.update(item[1].vars)
return True
if item:
path.vars.update(item[1].vars)
return size != valu
return cond
[docs]
async def getCondEval(self, runt):
if len(self.kids) == 3:
cmpr = await self.kids[1].compute(runt, None)
ctor = self.funcs.get(cmpr)
if ctor is None:
raise self.kids[1].addExcInfo(s_exc.NoSuchCmpr(cmpr=cmpr, type='subquery'))
return ctor(runt)
subq = self.kids[0]
async def cond(node, path):
genr = s_common.agen((node, path))
async for _, subp in subq.run(runt, genr):
path.vars.update(subp.vars)
return True
return False
return cond
[docs]
class OrCond(Cond):
'''
<cond> or <cond>
'''
[docs]
async def getCondEval(self, runt):
cond0 = await self.kids[0].getCondEval(runt)
cond1 = await self.kids[1].getCondEval(runt)
async def cond(node, path):
if await cond0(node, path):
return True
return await cond1(node, path)
return cond
[docs]
class AndCond(Cond):
'''
<cond> and <cond>
'''
[docs]
async def getLiftHints(self, runt, path):
h0 = await self.kids[0].getLiftHints(runt, path)
h1 = await self.kids[0].getLiftHints(runt, path)
return h0 + h1
[docs]
async def getCondEval(self, runt):
cond0 = await self.kids[0].getCondEval(runt)
cond1 = await self.kids[1].getCondEval(runt)
async def cond(node, path):
if not await cond0(node, path):
return False
return await cond1(node, path)
return cond
[docs]
class NotCond(Cond):
'''
not <cond>
'''
[docs]
async def getCondEval(self, runt):
kidcond = await self.kids[0].getCondEval(runt)
async def cond(node, path):
return not await kidcond(node, path)
return cond
[docs]
class TagCond(Cond):
'''
#foo.bar
'''
[docs]
async def getLiftHints(self, runt, path):
kid = self.kids[0]
if not isinstance(kid, TagMatch):
return []
if kid.hasglob():
return []
if kid.isconst:
return (
('tag', {'name': await kid.compute(None, None)}),
)
if kid.isRuntSafe(runt):
name = await kid.compute(runt, path)
if name and '*' not in name:
return (
('tag', {'name': name}),
)
return []
[docs]
async def getCondEval(self, runt):
assert len(self.kids) == 1
# kid is a non-runtsafe VarValue: dynamically evaluate value of variable for each node
async def cond(node, path):
name = await self.kids[0].compute(runt, path)
if name == '*':
return bool(node.tags)
if '*' in name:
reobj = s_cache.getTagGlobRegx(name)
return any(reobj.fullmatch(p) for p in node.tags)
return node.tags.get(name) is not None
return cond
[docs]
class HasRelPropCond(Cond):
[docs]
async def getCondEval(self, runt):
relprop = self.kids[0]
assert isinstance(relprop, RelProp)
if relprop.isconst:
name = await relprop.compute(runt, None)
async def cond(node, path):
return await self.hasProp(node, runt, name)
return cond
# relprop name itself is variable, so dynamically compute
async def cond(node, path):
name = await relprop.compute(runt, path)
return await self.hasProp(node, runt, name)
return cond
[docs]
async def hasProp(self, node, runt, name):
ispiv = name.find('::') != -1
if not ispiv:
return node.has(name)
# handle implicit pivot properties
names = name.split('::')
imax = len(names) - 1
for i, part in enumerate(names):
valu = node.get(part)
if valu is None:
return False
if i >= imax:
return True
prop = node.form.props.get(part)
if prop is None:
mesg = f'No property named {node.form.name}:{part}'
exc = s_exc.NoSuchProp(mesg=mesg, name=part, form=node.form.name)
raise self.kids[0].addExcInfo(exc)
form = runt.model.forms.get(prop.type.name)
if form is None:
mesg = f'No form {prop.type.name}'
exc = s_exc.NoSuchForm.init(prop.type.name)
raise self.kids[0].addExcInfo(exc)
node = await runt.snap.getNodeByNdef((form.name, valu))
if node is None:
return False
[docs]
async def getLiftHints(self, runt, path):
relprop = self.kids[0]
name = await relprop.compute(runt, path)
ispiv = name.find('::') != -1
if ispiv:
return (
('relprop', {'name': name.split('::')[0]}),
)
hint = {
'name': name,
'univ': isinstance(relprop, UnivProp),
}
return (
('relprop', hint),
)
[docs]
class HasTagPropCond(Cond):
[docs]
async def getCondEval(self, runt):
async def cond(node, path):
tag = await self.kids[0].compute(runt, path)
name = await self.kids[1].compute(runt, path)
if tag == '*':
return any(name in props for props in node.tagprops.values())
if '*' in tag:
reobj = s_cache.getTagGlobRegx(tag)
for tagname, props in node.tagprops.items():
if reobj.fullmatch(tagname) and name in props:
return True
return node.hasTagProp(tag, name)
return cond
[docs]
class HasAbsPropCond(Cond):
[docs]
async def getCondEval(self, runt):
name = await self.kids[0].compute(runt, None)
prop = runt.model.props.get(name)
if prop is not None:
if prop.isform:
async def cond(node, path):
return node.form.name == prop.name
return cond
async def cond(node, path):
if node.form.name != prop.form.name:
return False
return node.has(prop.name)
return cond
if name.endswith('*'):
formlist = runt.model.reqFormsByPrefix(name[:-1], extra=self.kids[0].addExcInfo)
async def cond(node, path):
return node.form.name in formlist
return cond
if (formlist := runt.model.formsbyiface.get(name)) is not None:
async def cond(node, path):
return node.form.name in formlist
return cond
if (proplist := runt.model.ifaceprops.get(name)) is not None:
formlist = []
for propname in proplist:
prop = runt.model.props.get(propname)
formlist.append(prop.form.name)
relname = prop.name
async def cond(node, path):
if node.form.name not in formlist:
return False
return node.has(relname)
return cond
raise self.kids[0].addExcInfo(s_exc.NoSuchProp.init(name))
[docs]
class ArrayCond(Cond):
[docs]
async def getCondEval(self, runt):
cmpr = await self.kids[1].compute(runt, None)
async def cond(node, path):
name = await self.kids[0].compute(runt, None)
prop = node.form.props.get(name)
if prop is None:
raise self.kids[0].addExcInfo(s_exc.NoSuchProp.init(name))
if not prop.type.isarray:
mesg = f'Array filter syntax is invalid for non-array prop {name}.'
raise self.kids[1].addExcInfo(s_exc.BadCmprType(mesg=mesg))
ctor = prop.type.arraytype.getCmprCtor(cmpr)
items = node.get(name)
if items is None:
return False
val2 = await self.kids[2].compute(runt, path)
for item in items:
if ctor(val2)(item):
return True
return False
return cond
[docs]
class AbsPropCond(Cond):
[docs]
async def getCondEval(self, runt):
name = await self.kids[0].compute(runt, None)
cmpr = await self.kids[1].compute(runt, None)
prop = runt.model.props.get(name)
if prop is not None:
ctor = prop.type.getCmprCtor(cmpr)
if ctor is None:
raise self.kids[1].addExcInfo(s_exc.NoSuchCmpr(cmpr=cmpr, name=prop.type.name))
if prop.isform:
async def cond(node, path):
if node.ndef[0] != name:
return False
val1 = node.ndef[1]
val2 = await self.kids[2].compute(runt, path)
return ctor(val2)(val1)
return cond
async def cond(node, path):
if node.ndef[0] != prop.form.name:
return False
val1 = node.get(prop.name)
if val1 is None:
return False
val2 = await self.kids[2].compute(runt, path)
return ctor(val2)(val1)
return cond
proplist = runt.model.ifaceprops.get(name)
if proplist is not None:
prop = runt.model.props.get(proplist[0])
relname = prop.name
ctor = prop.type.getCmprCtor(cmpr)
if ctor is None:
raise self.kids[1].addExcInfo(s_exc.NoSuchCmpr(cmpr=cmpr, name=prop.type.name))
async def cond(node, path):
val1 = node.get(relname)
if val1 is None:
return False
val2 = await self.kids[2].compute(runt, path)
return ctor(val2)(val1)
return cond
raise self.kids[0].addExcInfo(s_exc.NoSuchProp.init(name))
[docs]
class TagValuCond(Cond):
[docs]
async def getCondEval(self, runt):
lnode, cnode, rnode = self.kids
ival = runt.model.type('ival')
cmpr = await cnode.compute(runt, None)
cmprctor = ival.getCmprCtor(cmpr)
if cmprctor is None:
raise cnode.addExcInfo(s_exc.NoSuchCmpr(cmpr=cmpr, name=ival.name))
if isinstance(lnode, VarValue) or not lnode.isconst:
async def cond(node, path):
name = await lnode.compute(runt, path)
if '*' in name:
mesg = f'Wildcard tag names may not be used in conjunction with tag value comparison: {name}'
raise self.addExcInfo(s_exc.StormRuntimeError(mesg=mesg, name=name))
valu = await rnode.compute(runt, path)
return cmprctor(valu)(node.tags.get(name))
return cond
name = await lnode.compute(runt, None)
if isinstance(rnode, Const):
valu = await rnode.compute(runt, None)
cmpr = cmprctor(valu)
async def cond(node, path):
return cmpr(node.tags.get(name))
return cond
# it's a runtime value...
async def cond(node, path):
valu = await self.kids[2].compute(runt, path)
return cmprctor(valu)(node.tags.get(name))
return cond
[docs]
class RelPropCond(Cond):
'''
(:foo:bar or .univ) <cmpr> <value>
'''
[docs]
async def getCondEval(self, runt):
cmpr = await self.kids[1].compute(runt, None)
valukid = self.kids[2]
async def cond(node, path):
prop, valu = await self.kids[0].getPropAndValu(runt, path)
if valu is None:
return False
xval = await valukid.compute(runt, path)
if not isinstance(xval, s_node.Node):
xval = await s_stormtypes.tostor(xval)
if xval is None:
return False
ctor = prop.type.getCmprCtor(cmpr)
if ctor is None:
raise self.kids[1].addExcInfo(s_exc.NoSuchCmpr(cmpr=cmpr, name=prop.type.name))
func = ctor(xval)
return func(valu)
return cond
[docs]
async def getLiftHints(self, runt, path):
relprop = self.kids[0].kids[0]
name = await relprop.compute(runt, path)
ispiv = name.find('::') != -1
if ispiv:
return (
('relprop', {'name': name.split('::')[0]}),
)
hint = {
'name': name,
'univ': isinstance(relprop, UnivProp),
'cmpr': await self.kids[1].compute(runt, path),
'valu': await self.kids[2].compute(runt, path),
}
return (
('relprop', hint),
)
[docs]
class TagPropCond(Cond):
[docs]
async def getCondEval(self, runt):
cmpr = await self.kids[2].compute(runt, None)
async def cond(node, path):
tag = await self.kids[0].compute(runt, path)
name = await self.kids[1].compute(runt, path)
if '*' in tag:
mesg = f'Wildcard tag names may not be used in conjunction with tagprop value comparison: {tag}'
raise self.addExcInfo(s_exc.StormRuntimeError(mesg=mesg, name=tag))
prop = runt.model.getTagProp(name)
if prop is None:
mesg = f'No such tag property: {name}'
raise self.kids[0].addExcInfo(s_exc.NoSuchTagProp(name=name, mesg=mesg))
# TODO cache on (cmpr, valu) for perf?
valu = await self.kids[3].compute(runt, path)
ctor = prop.type.getCmprCtor(cmpr)
if ctor is None:
raise self.kids[1].addExcInfo(s_exc.NoSuchCmpr(cmpr=cmpr, name=prop.type.name))
curv = node.getTagProp(tag, name)
if curv is None:
return False
return ctor(valu)(curv)
return cond
[docs]
class FiltOper(Oper):
[docs]
async def getLiftHints(self, runt, path):
if await self.kids[0].compute(None, None) != '+':
return []
return await self.kids[1].getLiftHints(runt, path)
[docs]
async def run(self, runt, genr):
must = await self.kids[0].compute(None, None) == '+'
cond = await self.kids[1].getCondEval(runt)
async for node, path in genr:
answ = await cond(node, path)
if (must and answ) or (not must and not answ):
yield node, path
else:
# all filters must sleep
await asyncio.sleep(0)
[docs]
class FiltByArray(FiltOper):
'''
+:foo*[^=visi]
'''
[docs]
class ArgvQuery(Value):
runtopaque = True
[docs]
def isRuntSafe(self, runt):
# an argv query is really just a string, so it's runtsafe.
return True
[docs]
def validate(self, runt):
# validation is done by the sub-runtime
pass
[docs]
async def compute(self, runt, path):
return self.kids[0].text
[docs]
class PropValue(Value):
[docs]
def prepare(self):
self.isconst = isinstance(self.kids[0], Const)
[docs]
def isRuntSafe(self, runt):
return False
[docs]
def isRuntSafeAtom(self, runt):
return False
[docs]
async def getPropAndValu(self, runt, path):
if not path:
return None, None
propname = await self.kids[0].compute(runt, path)
name = await tostr(propname)
ispiv = name.find('::') != -1
if not ispiv:
prop = path.node.form.props.get(name)
if prop is None:
if (exc := await s_stormtypes.typeerr(propname, str)) is None:
mesg = f'No property named {name}.'
exc = s_exc.NoSuchProp(mesg=mesg, name=name, form=path.node.form.name)
raise self.kids[0].addExcInfo(exc)
valu = path.node.get(name)
if isinstance(valu, (dict, list, tuple)):
# these get special cased because changing them affects the node
# while it's in the pipeline but the modification doesn't get stored
valu = s_msgpack.deepcopy(valu)
return prop, valu
# handle implicit pivot properties
names = name.split('::')
node = path.node
imax = len(names) - 1
for i, name in enumerate(names):
valu = node.get(name)
if valu is None:
return None, None
prop = node.form.props.get(name)
if prop is None: # pragma: no cover
if (exc := await s_stormtypes.typeerr(propname, str)) is None:
mesg = f'No property named {name}.'
exc = s_exc.NoSuchProp(mesg=mesg, name=name, form=node.form.name)
raise self.kids[0].addExcInfo(exc)
if i >= imax:
if isinstance(valu, (dict, list, tuple)):
# these get special cased because changing them affects the node
# while it's in the pipeline but the modification doesn't get stored
valu = s_msgpack.deepcopy(valu)
return prop, valu
form = runt.model.forms.get(prop.type.name)
if form is None:
raise self.addExcInfo(s_exc.NoSuchForm.init(prop.type.name))
node = await runt.snap.getNodeByNdef((form.name, valu))
if node is None:
return None, None
[docs]
async def compute(self, runt, path):
prop, valu = await self.getPropAndValu(runt, path)
return valu
[docs]
class RelPropValue(PropValue):
pass
[docs]
class UnivPropValue(PropValue):
pass
[docs]
class TagValue(Value):
[docs]
def isRuntSafe(self, runt):
return False
[docs]
def isRuntSafeAtom(self, runt):
return False
[docs]
async def compute(self, runt, path):
valu = await self.kids[0].compute(runt, path)
return path.node.getTag(valu)
[docs]
class TagProp(Value):
[docs]
async def compute(self, runt, path):
tag = await self.kids[0].compute(runt, path)
prop = await self.kids[1].compute(runt, path)
return (tag, prop)
[docs]
class TagPropValue(Value):
[docs]
async def compute(self, runt, path):
tag, prop = await self.kids[0].compute(runt, path)
return path.node.getTagProp(tag, prop)
[docs]
class CallArgs(Value):
[docs]
async def compute(self, runt, path):
return [await k.compute(runt, path) for k in self.kids]
[docs]
class CallKwarg(CallArgs):
pass
[docs]
class CallKwargs(CallArgs):
pass
[docs]
class VarValue(Value):
[docs]
def validate(self, runt):
if runt.runtvars.get(self.name) is None:
exc = s_exc.NoSuchVar(mesg=f'Missing variable: {self.name}', name=self.name)
raise self.addExcInfo(exc)
[docs]
def prepare(self):
assert isinstance(self.kids[0], Const)
self.name = self.kids[0].value()
self.isconst = False
[docs]
def isRuntSafe(self, runt):
return runt.isRuntVar(self.name)
[docs]
def isRuntSafeAtom(self, runt):
return runt.isRuntVar(self.name)
[docs]
def hasVarName(self, name):
return self.kids[0].value() == name
[docs]
async def compute(self, runt, path):
if path is not None:
valu = path.getVar(self.name, defv=s_common.novalu)
if valu is not s_common.novalu:
return valu
valu = runt.getVar(self.name, defv=s_common.novalu)
if valu is not s_common.novalu:
return valu
if runt.isRuntVar(self.name):
exc = s_exc.NoSuchVar(mesg=f'Runtsafe variable used before assignment: {self.name}',
name=self.name, runtsafe=True)
else:
exc = s_exc.NoSuchVar(mesg=f'Non-runtsafe variable used before assignment: {self.name}',
name=self.name, runtsafe=False)
raise self.addExcInfo(exc)
[docs]
class VarDeref(Value):
[docs]
async def compute(self, runt, path):
base = await self.kids[0].compute(runt, path)
# the deref of None is always None
if base is None:
return None
name = await self.kids[1].compute(runt, path)
valu = s_stormtypes.fromprim(base, path=path)
with s_scope.enter({'runt': runt}):
try:
return await valu.deref(name)
except s_exc.SynErr as e:
raise self.kids[1].addExcInfo(e)
[docs]
class FuncCall(Value):
[docs]
async def compute(self, runt, path):
func = await self.kids[0].compute(runt, path)
if not callable(func):
text = self.getAstText()
styp = await s_stormtypes.totype(func, basetypes=True)
mesg = f"'{styp}' object is not callable: {text}"
raise self.addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
if runt.readonly and not getattr(func, '_storm_readonly', False):
mesg = f'Function ({func.__name__}) is not marked readonly safe.'
raise self.kids[0].addExcInfo(s_exc.IsReadOnly(mesg=mesg))
argv = await self.kids[1].compute(runt, path)
kwargs = {k: v for (k, v) in await self.kids[2].compute(runt, path)}
with s_scope.enter({'runt': runt}):
retn = func(*argv, **kwargs)
if s_coro.iscoro(retn):
return await retn
return retn
[docs]
class DollarExpr(Value):
'''
Top level node for $(...) expressions
'''
[docs]
async def compute(self, runt, path):
return await self.kids[0].compute(runt, path)
[docs]
async def expr_add(x, y):
return await tonumber(x) + await tonumber(y)
[docs]
async def expr_sub(x, y):
return await tonumber(x) - await tonumber(y)
[docs]
async def expr_mod(x, y):
return await tonumber(x) % await tonumber(y)
[docs]
async def expr_mul(x, y):
return await tonumber(x) * await tonumber(y)
[docs]
async def expr_div(x, y):
x = await tonumber(x)
y = await tonumber(y)
if isinstance(x, int) and isinstance(y, int):
return x // y
return x / y
[docs]
async def expr_pow(x, y):
return await tonumber(x) ** await tonumber(y)
[docs]
async def expr_eq(x, y):
return await tocmprvalu(x) == await tocmprvalu(y)
[docs]
async def expr_ne(x, y):
return await tocmprvalu(x) != await tocmprvalu(y)
[docs]
async def expr_gt(x, y):
return await tonumber(x) > await tonumber(y)
[docs]
async def expr_lt(x, y):
return await tonumber(x) < await tonumber(y)
[docs]
async def expr_ge(x, y):
return await tonumber(x) >= await tonumber(y)
[docs]
async def expr_le(x, y):
return await tonumber(x) <= await tonumber(y)
[docs]
async def expr_prefix(x, y):
x, y = await tostr(x), await tostr(y)
return x.startswith(y)
[docs]
async def expr_re(x, y):
if regex.search(await tostr(y), await tostr(x), flags=regex.I):
return True
return False
_ExprFuncMap = {
'+': expr_add,
'-': expr_sub,
'%': expr_mod,
'*': expr_mul,
'/': expr_div,
'**': expr_pow,
'=': expr_eq,
'!=': expr_ne,
'~=': expr_re,
'>': expr_gt,
'<': expr_lt,
'>=': expr_ge,
'<=': expr_le,
'^=': expr_prefix,
}
[docs]
async def expr_not(x):
return not await tobool(x)
[docs]
async def expr_neg(x):
return await tonumber(x) * -1
_UnaryExprFuncMap = {
'-': expr_neg,
'not': expr_not,
}
[docs]
class UnaryExprNode(Value):
'''
A unary (i.e. single-argument) expression node
'''
[docs]
def prepare(self):
assert len(self.kids) == 2
assert isinstance(self.kids[0], Const)
oper = self.kids[0].value()
self._operfunc = _UnaryExprFuncMap[oper]
[docs]
async def compute(self, runt, path):
return await self._operfunc(await self.kids[1].compute(runt, path))
[docs]
class ExprNode(Value):
'''
A binary (i.e. two argument) expression node
'''
[docs]
def prepare(self):
assert len(self.kids) == 3
assert isinstance(self.kids[1], Const)
oper = self.kids[1].value()
self._operfunc = _ExprFuncMap[oper]
[docs]
async def compute(self, runt, path):
parm1 = await self.kids[0].compute(runt, path)
parm2 = await self.kids[2].compute(runt, path)
try:
return await self._operfunc(parm1, parm2)
except ZeroDivisionError:
exc = s_exc.StormRuntimeError(mesg='Cannot divide by zero')
raise self.kids[2].addExcInfo(exc)
except decimal.InvalidOperation:
exc = s_exc.StormRuntimeError(mesg='Invalid operation on a Number')
raise self.addExcInfo(exc)
[docs]
class ExprOrNode(Value):
[docs]
async def compute(self, runt, path):
parm1 = await self.kids[0].compute(runt, path)
if await tobool(parm1):
return True
parm2 = await self.kids[2].compute(runt, path)
return await tobool(parm2)
[docs]
class ExprAndNode(Value):
[docs]
async def compute(self, runt, path):
parm1 = await self.kids[0].compute(runt, path)
if not await tobool(parm1):
return False
parm2 = await self.kids[2].compute(runt, path)
return await tobool(parm2)
[docs]
class TagName(Value):
[docs]
def prepare(self):
self.isconst = not self.kids or all(isinstance(k, Const) for k in self.kids)
if self.isconst and self.kids:
self.constval = '.'.join([k.value() for k in self.kids])
else:
self.constval = None
[docs]
async def compute(self, runt, path):
if self.isconst:
return self.constval
if not isinstance(self.kids[0], Const):
valu = await self.kids[0].compute(runt, path)
valu = await s_stormtypes.toprim(valu)
if not isinstance(valu, str):
mesg = 'Invalid value type for tag name, tag names must be strings.'
raise s_exc.BadTypeValu(mesg=mesg)
normtupl = await runt.snap.getTagNorm(valu)
return normtupl[0]
vals = []
for kid in self.kids:
part = await kid.compute(runt, path)
if part is None:
mesg = f'Null value from var ${kid.name} is not allowed in tag names.'
raise kid.addExcInfo(s_exc.BadTypeValu(mesg=mesg))
part = await tostr(part)
partnorm = await runt.snap.getTagNorm(part)
vals.append(partnorm[0])
return '.'.join(vals)
[docs]
async def computeTagArray(self, runt, path, excignore=()):
if self.isconst:
return (self.constval,)
if not isinstance(self.kids[0], Const):
tags = []
vals = await self.kids[0].compute(runt, path)
vals = await s_stormtypes.toprim(vals)
if not isinstance(vals, (tuple, list, set)):
vals = (vals,)
for valu in vals:
try:
if not isinstance(valu, str):
mesg = 'Invalid value type for tag name, tag names must be strings.'
raise s_exc.BadTypeValu(mesg=mesg)
normtupl = await runt.snap.getTagNorm(valu)
if normtupl is None:
continue
tags.append(normtupl[0])
except excignore:
pass
return tags
vals = []
for kid in self.kids:
part = await kid.compute(runt, path)
if part is None:
mesg = f'Null value from var ${kid.name} is not allowed in tag names.'
raise kid.addExcInfo(s_exc.BadTypeValu(mesg=mesg))
part = await tostr(part)
partnorm = await runt.snap.getTagNorm(part)
vals.append(partnorm[0])
return ('.'.join(vals),)
[docs]
class TagMatch(TagName):
'''
Like TagName, but can have asterisks
'''
[docs]
def hasglob(self):
assert self.kids
# TODO support vars with asterisks?
return any('*' in kid.valu for kid in self.kids if isinstance(kid, Const))
[docs]
async def compute(self, runt, path):
if self.isconst:
return self.constval
if not isinstance(self.kids[0], Const):
valu = await self.kids[0].compute(runt, path)
valu = await s_stormtypes.toprim(valu)
if not isinstance(valu, str):
mesg = 'Invalid value type for tag name, tag names must be strings.'
raise s_exc.BadTypeValu(mesg=mesg)
return valu
vals = []
for kid in self.kids:
part = await kid.compute(runt, path)
if part is None:
mesg = f'Null value from var ${kid.name} is not allowed in tag names.'
raise s_exc.BadTypeValu(mesg=mesg)
vals.append(await tostr(part))
return '.'.join(vals)
[docs]
class Const(Value):
def __init__(self, astinfo, valu, kids=()):
Value.__init__(self, astinfo, kids=kids)
self.isconst = True
self.valu = valu
[docs]
def repr(self):
return f'{self.__class__.__name__}: {self.valu}'
[docs]
def isRuntSafe(self, runt):
return True
[docs]
def value(self):
return self.valu
[docs]
async def compute(self, runt, path):
return self.valu
[docs]
class ExprDict(Value):
[docs]
def prepare(self):
self.const = None
if all(isinstance(k, Const) and not isinstance(k, EmbedQuery) for k in self.kids):
valu = {}
for i in range(0, len(self.kids), 2):
valu[self.kids[i].value()] = self.kids[i + 1].value()
self.const = s_msgpack.en(valu)
[docs]
async def compute(self, runt, path):
if self.const is not None:
return s_stormtypes.Dict(s_msgpack.un(self.const))
valu = {}
for i in range(0, len(self.kids), 2):
key = await self.kids[i].compute(runt, path)
if s_stormtypes.ismutable(key):
key = await s_stormtypes.torepr(key)
raise s_exc.BadArg(mesg='Mutable values are not allowed as dictionary keys', name=key)
key = await toprim(key)
valu[key] = await self.kids[i + 1].compute(runt, path)
return s_stormtypes.Dict(valu)
[docs]
class ExprList(Value):
[docs]
def prepare(self):
self.const = None
if all(isinstance(k, Const) and not isinstance(k, EmbedQuery) for k in self.kids):
self.const = s_msgpack.en([k.value() for k in self.kids])
[docs]
async def compute(self, runt, path):
if self.const is not None:
return s_stormtypes.List(list(s_msgpack.un(self.const)))
return s_stormtypes.List([await v.compute(runt, path) for v in self.kids])
[docs]
class VarList(Const):
pass
[docs]
class Cmpr(Const):
pass
[docs]
class Bool(Const):
pass
[docs]
class EmbedQuery(Const):
runtopaque = True
[docs]
def validate(self, runt):
# var scope validation occurs in the sub-runtime
pass
[docs]
def hasVarName(self, name):
# similar to above, the sub-runtime handles var scoping
return False
[docs]
def getRuntVars(self, runt):
if 0:
yield
[docs]
async def compute(self, runt, path):
varz = {}
varz.update(runt.getScopeVars())
if path is not None:
varz.update(path.vars)
return s_stormtypes.Query(self.valu, varz, runt, path=path)
[docs]
class List(Value):
[docs]
def prepare(self):
self.isconst = all(isinstance(k, Const) for k in self.kids)
[docs]
def repr(self):
return 'List: %s' % self.kids
[docs]
async def compute(self, runt, path):
return [await k.compute(runt, path) for k in self.kids]
[docs]
class PropName(Value):
[docs]
def prepare(self):
self.isconst = isinstance(self.kids[0], Const)
[docs]
async def compute(self, runt, path):
return await self.kids[0].compute(runt, path)
[docs]
class RelProp(PropName):
pass
[docs]
class UnivProp(RelProp):
[docs]
async def compute(self, runt, path):
valu = await tostr(await self.kids[0].compute(runt, path))
if self.isconst:
return valu
return '.' + valu
[docs]
class EditParens(Edit):
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
nodeadd = self.kids[0]
assert isinstance(nodeadd, EditNodeAdd)
formname = await nodeadd.kids[0].compute(runt, None)
runt.layerConfirm(('node', 'add', formname))
# create an isolated generator for the add vs edit
if nodeadd.isRuntSafe(runt):
# Luke, let the (node,path) tuples flow through you
async for item in genr:
yield item
# isolated runtime stack...
genr = s_common.agen()
for oper in self.kids:
genr = oper.run(runt, genr)
async for item in genr:
yield item
else:
# do a little genr-jig.
async for node, path in genr:
formname = await nodeadd.kids[0].compute(runt, path)
form = runt.model.form(formname)
yield node, path
async def editgenr():
async for item in nodeadd.addFromPath(form, runt, path):
yield item
fullgenr = editgenr()
for oper in self.kids[1:]:
fullgenr = oper.run(runt, fullgenr)
async for item in fullgenr:
yield item
[docs]
class EditNodeAdd(Edit):
[docs]
def prepare(self):
assert isinstance(self.kids[0], FormName)
assert isinstance(self.kids[1], Const)
self.oper = self.kids[1].value()
self.excignore = (s_exc.BadTypeValu, ) if self.oper == '?=' else ()
[docs]
async def addFromPath(self, form, runt, path):
'''
Add a node using the context from path.
NOTE: CALLER MUST CHECK PERMS
'''
vals = await self.kids[2].compute(runt, path)
try:
if isinstance(form.type, s_types.Guid):
vals = await s_stormtypes.toprim(vals)
for valu in form.type.getTypeVals(vals):
try:
newn = await runt.snap.addNode(form.name, valu)
except self.excignore:
pass
else:
yield newn, runt.initPath(newn)
except self.excignore:
await asyncio.sleep(0)
[docs]
async def run(self, runt, genr):
# the behavior here is a bit complicated...
# single value add (runtime computed per node )
# In the cases below, $hehe is input to the storm runtime vars.
# case 1: [ foo:bar="lols" ]
# case 2: [ foo:bar=$hehe ]
# case 2: [ foo:bar=$lib.func(20, $hehe) ]
# case 3: ($foo, $bar) = $hehe [ foo:bar=($foo, $bar) ]
# iterative add ( node add is executed once per inbound node )
# case 1: <query> [ foo:bar=(:baz, 20) ]
# case 2: <query> [ foo:bar=($node, 20) ]
# case 2: <query> $blah=:baz [ foo:bar=($blah, 20) ]
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
runtsafe = self.isRuntSafe(runt)
async def feedfunc():
if not runtsafe:
first = True
async for node, path in genr:
# must reach back first to trigger sudo / etc
name = await self.kids[0].compute(runt, path)
formname = await tostr(name)
runt.layerConfirm(('node', 'add', formname))
form = runt.model.form(formname)
if form is None:
if (exc := await s_stormtypes.typeerr(name, str)) is None:
exc = s_exc.NoSuchForm.init(formname)
raise self.kids[0].addExcInfo(exc)
# must use/resolve all variables from path before yield
async for item in self.addFromPath(form, runt, path):
yield item
yield node, path
await asyncio.sleep(0)
else:
name = await self.kids[0].compute(runt, None)
formname = await tostr(name)
runt.layerConfirm(('node', 'add', formname))
form = runt.model.form(formname)
if form is None:
if (exc := await s_stormtypes.typeerr(name, str)) is None:
exc = s_exc.NoSuchForm.init(formname)
raise self.kids[0].addExcInfo(exc)
valu = await self.kids[2].compute(runt, None)
valu = await s_stormtypes.tostor(valu)
try:
for valu in form.type.getTypeVals(valu):
try:
node = await runt.snap.addNode(formname, valu)
except self.excignore:
continue
yield node, runt.initPath(node)
await asyncio.sleep(0)
except self.excignore:
await asyncio.sleep(0)
if runtsafe:
async for node, path in genr:
yield node, path
async with contextlib.aclosing(s_base.schedGenr(feedfunc())) as agen:
async for item in agen:
yield item
[docs]
class EditPropSet(Edit):
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
oper = await self.kids[1].compute(runt, None)
excignore = (s_exc.BadTypeValu,) if oper in ('?=', '?+=', '?-=') else ()
isadd = oper in ('+=', '?+=')
issub = oper in ('-=', '?-=')
rval = self.kids[2]
expand = True
async for node, path in genr:
propname = await self.kids[0].compute(runt, path)
name = await tostr(propname)
prop = node.form.props.get(name)
if prop is None:
if (exc := await s_stormtypes.typeerr(propname, str)) is None:
mesg = f'No property named {name}.'
exc = s_exc.NoSuchProp(mesg=mesg, name=name, form=node.form.name)
raise self.kids[0].addExcInfo(exc)
if not node.form.isrunt:
# runt node property permissions are enforced by the callback
runt.confirmPropSet(prop)
isndef = isinstance(prop.type, s_types.Ndef)
isarray = isinstance(prop.type, s_types.Array)
try:
if isarray and isinstance(rval, SubQuery):
valu = await rval.compute_array(runt, path)
expand = False
else:
valu = await rval.compute(runt, path)
valu = await s_stormtypes.tostor(valu, isndef=isndef)
if isadd or issub:
if not isarray:
mesg = f'Property set using ({oper}) is only valid on arrays.'
exc = s_exc.StormRuntimeError(mesg)
raise self.kids[0].addExcInfo(exc)
arry = node.get(name)
if arry is None:
arry = ()
# make arry mutable
arry = list(arry)
if expand:
valu = (valu,)
if isadd:
arry.extend(valu)
else:
assert issub
# we cant remove something we cant norm...
# but that also means it can't be in the array so...
for v in valu:
norm, info = prop.type.arraytype.norm(v)
try:
arry.remove(norm)
except ValueError:
pass
valu = arry
if isinstance(prop.type, s_types.Ival):
oldv = node.get(name)
if oldv is not None:
valu, _ = prop.type.norm(valu)
valu = prop.type.merge(oldv, valu)
await node.set(name, valu)
except excignore:
pass
yield node, path
await asyncio.sleep(0)
[docs]
class EditPropDel(Edit):
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
async for node, path in genr:
propname = await self.kids[0].compute(runt, path)
name = await tostr(propname)
prop = node.form.props.get(name)
if prop is None:
if (exc := await s_stormtypes.typeerr(propname, str)) is None:
mesg = f'No property named {name}.'
exc = s_exc.NoSuchProp(mesg=mesg, name=name, form=node.form.name)
raise self.kids[0].addExcInfo(exc)
runt.confirmPropDel(prop)
await node.pop(name)
yield node, path
await asyncio.sleep(0)
[docs]
class EditUnivDel(Edit):
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
univprop = self.kids[0]
assert isinstance(univprop, UnivProp)
if univprop.isconst:
name = await self.kids[0].compute(None, None)
univ = runt.model.props.get(name)
if univ is None:
mesg = f'No property named {name}.'
exc = s_exc.NoSuchProp(mesg=mesg, name=name)
raise self.kids[0].addExcInfo(exc)
async for node, path in genr:
if not univprop.isconst:
name = await univprop.compute(runt, path)
univ = runt.model.props.get(name)
if univ is None:
mesg = f'No property named {name}.'
exc = s_exc.NoSuchProp(mesg=mesg, name=name)
raise self.kids[0].addExcInfo(exc)
runt.layerConfirm(('node', 'prop', 'del', name))
await node.pop(name)
yield node, path
await asyncio.sleep(0)
[docs]
class N1Walk(Oper):
def __init__(self, astinfo, kids=(), isjoin=False, reverse=False):
Oper.__init__(self, astinfo, kids=kids)
self.isjoin = isjoin
self.reverse = reverse
[docs]
def repr(self):
return f'{self.__class__.__name__}: {self.kids}, isjoin={self.isjoin}'
[docs]
async def walkNodeEdges(self, runt, node, verb=None):
async for verb, iden in node.iterEdgesN1(verb=verb):
buid = s_common.uhex(iden)
walknode = await runt.snap.getNodeByBuid(buid)
if walknode is not None:
yield verb, walknode
[docs]
def buildfilter(self, runt, destforms, cmpr):
if not isinstance(destforms, (tuple, list)):
destforms = (destforms,)
if '*' in destforms:
if cmpr is not None:
mesg = 'Wild card walk operations do not support comparison.'
raise self.addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
return False
forms = set()
formprops = collections.defaultdict(dict)
for destform in destforms:
prop = runt.model.prop(destform)
if prop is not None:
if prop.isform:
forms.add(destform)
else:
formprops[prop.form.name][prop.name] = prop
continue
formlist = runt.model.reqFormsByLook(destform, extra=self.kids[0].addExcInfo)
forms.update(formlist)
if cmpr is None:
async def destfilt(node, path, cmprvalu):
if node.form.full in forms:
return True
props = formprops.get(node.form.full)
if props is not None:
for prop in props:
if node.get(prop) is not None:
return True
return False
return destfilt
async def destfilt(node, path, cmprvalu):
if node.form.full in forms:
return node.form.type.cmpr(node.ndef[1], cmpr, cmprvalu)
props = formprops.get(node.form.full)
if props is not None:
for name, prop in props.items():
if (propvalu := node.get(name)) is not None:
if prop.type.cmpr(propvalu, cmpr, cmprvalu):
return True
return False
return destfilt
[docs]
async def run(self, runt, genr):
cmpr = None
cmprvalu = None
destfilt = None
if len(self.kids) == 4:
cmpr = await self.kids[2].compute(runt, None)
async for node, path in genr:
if self.isjoin:
yield node, path
verbs = await self.kids[0].compute(runt, path)
verbs = await s_stormtypes.toprim(verbs)
if not isinstance(verbs, (str, list, tuple)):
mesg = f'walk operation expected a string or list. got: {verbs!r}.'
raise self.kids[0].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
if isinstance(verbs, str):
verbs = (verbs,)
if cmpr is not None:
cmprvalu = await self.kids[3].compute(runt, path)
if destfilt is None or not self.kids[1].isconst:
dest = await self.kids[1].compute(runt, path)
dest = await s_stormtypes.toprim(dest)
destfilt = self.buildfilter(runt, dest, cmpr)
for verb in verbs:
verb = await s_stormtypes.tostr(verb)
if verb == '*':
verb = None
async for verbname, walknode in self.walkNodeEdges(runt, node, verb=verb):
if destfilt and not await destfilt(walknode, path, cmprvalu):
continue
link = {'type': 'edge', 'verb': verbname}
if self.reverse:
link['reverse'] = True
yield walknode, path.fork(walknode, link)
[docs]
class N2Walk(N1Walk):
def __init__(self, astinfo, kids=(), isjoin=False):
N1Walk.__init__(self, astinfo, kids=kids, isjoin=isjoin, reverse=True)
[docs]
async def walkNodeEdges(self, runt, node, verb=None):
async for verb, iden in node.iterEdgesN2(verb=verb):
buid = s_common.uhex(iden)
walknode = await runt.snap.getNodeByBuid(buid)
if walknode is not None:
yield verb, walknode
[docs]
class EditEdgeAdd(Edit):
def __init__(self, astinfo, kids=(), n2=False):
Edit.__init__(self, astinfo, kids=kids)
self.n2 = n2
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
# SubQuery -> Query
query = self.kids[1].kids[0]
hits = set()
def allowed(x):
if x in hits:
return
runt.layerConfirm(('node', 'edge', 'add', x))
hits.add(x)
async for node, path in genr:
if node.form.isrunt:
mesg = f'Edges cannot be used with runt nodes: {node.form.full}'
raise self.addExcInfo(s_exc.IsRuntForm(mesg=mesg, form=node.form.full))
iden = node.iden()
verb = await tostr(await self.kids[0].compute(runt, path))
allowed(verb)
async with runt.getSubRuntime(query) as subr:
if self.n2:
async for subn, subp in subr.execute():
if subn.form.isrunt:
mesg = f'Edges cannot be used with runt nodes: {subn.form.full}'
raise self.addExcInfo(s_exc.IsRuntForm(mesg=mesg, form=subn.form.full))
await subn.addEdge(verb, iden)
else:
async with node.snap.getEditor() as editor:
proto = editor.loadNode(node)
async for subn, subp in subr.execute():
if subn.form.isrunt:
mesg = f'Edges cannot be used with runt nodes: {subn.form.full}'
raise self.addExcInfo(s_exc.IsRuntForm(mesg=mesg, form=subn.form.full))
await proto.addEdge(verb, subn.iden())
await asyncio.sleep(0)
if len(proto.edges) >= 1000:
nodeedits = editor.getNodeEdits()
if nodeedits:
await node.snap.applyNodeEdits(nodeedits)
proto.edges.clear()
yield node, path
[docs]
class EditEdgeDel(Edit):
def __init__(self, astinfo, kids=(), n2=False):
Edit.__init__(self, astinfo, kids=kids)
self.n2 = n2
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
query = self.kids[1].kids[0]
hits = set()
def allowed(x):
if x in hits:
return
runt.layerConfirm(('node', 'edge', 'del', x))
hits.add(x)
async for node, path in genr:
if node.form.isrunt:
mesg = f'Edges cannot be used with runt nodes: {node.form.full}'
raise self.addExcInfo(s_exc.IsRuntForm(mesg=mesg, form=node.form.full))
iden = node.iden()
verb = await tostr(await self.kids[0].compute(runt, path))
allowed(verb)
async with runt.getSubRuntime(query) as subr:
if self.n2:
async for subn, subp in subr.execute():
if subn.form.isrunt:
mesg = f'Edges cannot be used with runt nodes: {subn.form.full}'
raise self.addExcInfo(s_exc.IsRuntForm(mesg=mesg, form=subn.form.full))
await subn.delEdge(verb, iden)
else:
async with node.snap.getEditor() as editor:
proto = editor.loadNode(node)
async for subn, subp in subr.execute():
if subn.form.isrunt:
mesg = f'Edges cannot be used with runt nodes: {subn.form.full}'
raise self.addExcInfo(s_exc.IsRuntForm(mesg=mesg, form=subn.form.full))
await proto.delEdge(verb, subn.iden())
await asyncio.sleep(0)
if len(proto.edgedels) >= 1000:
nodeedits = editor.getNodeEdits()
if nodeedits:
await node.snap.applyNodeEdits(nodeedits)
proto.edgedels.clear()
yield node, path
[docs]
class EditTagAdd(Edit):
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
if len(self.kids) > 1 and isinstance(self.kids[0], Const) and (await self.kids[0].compute(runt, None)) == '?':
oper_offset = 1
else:
oper_offset = 0
excignore = (s_exc.BadTypeValu,) if oper_offset == 1 else ()
hasval = len(self.kids) > 2 + oper_offset
valu = (None, None)
async for node, path in genr:
try:
names = await self.kids[oper_offset].computeTagArray(runt, path, excignore=excignore)
except excignore:
yield node, path
await asyncio.sleep(0)
continue
for name in names:
try:
parts = name.split('.')
runt.layerConfirm(('node', 'tag', 'add', *parts))
if hasval:
valu = await self.kids[2 + oper_offset].compute(runt, path)
valu = await s_stormtypes.toprim(valu)
await node.addTag(name, valu=valu)
except excignore:
pass
yield node, path
await asyncio.sleep(0)
[docs]
class EditTagDel(Edit):
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
async for node, path in genr:
names = await self.kids[0].computeTagArray(runt, path, excignore=(s_exc.BadTypeValu,))
for name in names:
parts = name.split('.')
runt.layerConfirm(('node', 'tag', 'del', *parts))
await node.delTag(name)
yield node, path
await asyncio.sleep(0)
[docs]
class EditTagPropSet(Edit):
'''
[ #foo.bar:baz=10 ]
'''
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
oper = await self.kids[1].compute(runt, None)
excignore = s_exc.BadTypeValu if oper == '?=' else ()
async for node, path in genr:
tag, prop = await self.kids[0].compute(runt, path)
valu = await self.kids[2].compute(runt, path)
valu = await s_stormtypes.tostor(valu)
tagparts = tag.split('.')
# for now, use the tag add perms
runt.layerConfirm(('node', 'tag', 'add', *tagparts))
try:
await node.setTagProp(tag, prop, valu)
except asyncio.CancelledError: # pragma: no cover
raise
except excignore:
pass
yield node, path
await asyncio.sleep(0)
[docs]
class EditTagPropDel(Edit):
'''
[ -#foo.bar:baz ]
'''
[docs]
async def run(self, runt, genr):
if runt.readonly:
mesg = 'Storm runtime is in readonly mode, cannot create or edit nodes and other graph data.'
raise self.addExcInfo(s_exc.IsReadOnly(mesg=mesg))
async for node, path in genr:
tag, prop = await self.kids[0].compute(runt, path)
tagparts = tag.split('.')
# for now, use the tag add perms
runt.layerConfirm(('node', 'tag', 'del', *tagparts))
await node.delTagProp(tag, prop)
yield node, path
await asyncio.sleep(0)
[docs]
class BreakOper(AstNode):
[docs]
async def run(self, runt, genr):
# we must be a genr...
for _ in ():
yield _
async for node, path in genr:
raise s_stormctrl.StormBreak(item=(node, path))
raise s_stormctrl.StormBreak()
[docs]
class ContinueOper(AstNode):
[docs]
async def run(self, runt, genr):
# we must be a genr...
for _ in ():
yield _
async for node, path in genr:
raise s_stormctrl.StormContinue(item=(node, path))
raise s_stormctrl.StormContinue()
[docs]
class IfClause(AstNode):
pass
[docs]
class IfStmt(Oper):
[docs]
def prepare(self):
if isinstance(self.kids[-1], IfClause):
self.elsequery = None
self.clauses = self.kids
else:
self.elsequery = self.kids[-1]
self.clauses = self.kids[:-1]
async def _runtsafe_calc(self, runt):
'''
All conditions are runtsafe: figure out which clause wins
'''
for clause in self.clauses:
expr, subq = clause.kids
exprvalu = await expr.compute(runt, None)
if await tobool(exprvalu):
return subq
else:
return self.elsequery
[docs]
async def run(self, runt, genr):
count = 0
allcondsafe = all(clause.kids[0].isRuntSafe(runt) for clause in self.clauses)
async for node, path in genr:
count += 1
for clause in self.clauses:
expr, subq = clause.kids
exprvalu = await expr.compute(runt, path)
if await tobool(exprvalu):
break
else:
subq = self.elsequery
if subq:
assert isinstance(subq, SubQuery)
async for item in subq.inline(runt, s_common.agen((node, path))):
yield item
else:
# If none of the if branches were executed and no else present, pass the stream through unaltered
yield node, path
if count != 0 or not allcondsafe:
return
# no nodes and a runt safe value should execute the winning clause once
subq = await self._runtsafe_calc(runt)
if subq:
async for item in subq.inline(runt, s_common.agen()):
yield item
[docs]
class Return(Oper):
[docs]
async def run(self, runt, genr):
# fake out a generator...
for item in ():
yield item # pragma: no cover
valu = None
async for node, path in genr:
if self.kids:
valu = await self.kids[0].compute(runt, path)
raise s_stormctrl.StormReturn(valu)
# no items in pipeline... execute
if self.isRuntSafe(runt):
if self.kids:
valu = await self.kids[0].compute(runt, None)
raise s_stormctrl.StormReturn(valu)
[docs]
class Emit(Oper):
[docs]
async def run(self, runt, genr):
count = 0
async for node, path in genr:
count += 1
await runt.emit(await self.kids[0].compute(runt, path))
yield node, path
# no items in pipeline and runtsafe. execute once.
if count == 0 and self.isRuntSafe(runt):
await runt.emit(await self.kids[0].compute(runt, None))
[docs]
class Stop(Oper):
[docs]
async def run(self, runt, genr):
for _ in (): yield _
async for node, path in genr:
raise s_stormctrl.StormStop()
raise s_stormctrl.StormStop()
[docs]
class FuncArgs(AstNode):
'''
Represents the function arguments in a function definition
'''
[docs]
async def compute(self, runt, path):
retn = []
for kid in self.kids:
valu = await kid.compute(runt, path)
if isinstance(kid, CallKwarg):
if s_stormtypes.ismutable(valu[1]):
exc = s_exc.StormRuntimeError(mesg='Mutable default parameter value not allowed')
raise kid.addExcInfo(exc)
else:
valu = (valu, s_common.novalu)
retn.append(valu)
return retn
[docs]
class Function(AstNode):
'''
( name, args, body )
// use args/kwargs syntax
function bar(x, v=$(30)) {
}
# we auto-detect the behavior of the target function
# return a value
function bar(x, y) { return ($(x + y)) }
# a function that produces nodes
function bar(x, y) { [ baz:faz=(x, y) ] }
$foo = $bar(10, v=20)
'''
runtopaque = True
[docs]
def prepare(self):
assert isinstance(self.kids[0], Const)
self.name = self.kids[0].value()
self.hasemit = self.hasAstClass(Emit)
self.hasretn = self.hasAstClass(Return)
[docs]
def isRuntSafe(self, runt):
return True
[docs]
async def run(self, runt, genr):
argskid = self.kids[1]
if not argskid.isRuntSafe(runt):
exc = s_exc.StormRuntimeError(mesg='Non-runtsafe default parameter value not allowed')
raise argskid.addExcInfo(exc)
async def once():
argdefs = await argskid.compute(runt, None)
@s_stormtypes.stormfunc(readonly=True)
async def realfunc(*args, **kwargs):
return await self.callfunc(runt, argdefs, args, kwargs)
await runt.setVar(self.name, realfunc)
count = 0
async for node, path in genr:
count += 1
if count == 1:
await once()
yield node, path
if count == 0:
await once()
[docs]
def getRuntVars(self, runt):
yield (self.kids[0].value(), True)
[docs]
def validate(self, runt):
# var scope validation occurs in the sub-runtime
pass
[docs]
async def callfunc(self, runt, argdefs, args, kwargs):
'''
Execute a function call using the given runtime.
This function may return a value / generator / async generator
'''
mergargs = {}
posnames = set() # Positional argument names
argcount = len(args) + len(kwargs)
if argcount > len(argdefs):
mesg = f'{self.name}() takes {len(argdefs)} arguments but {argcount} were provided'
raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
# Fill in the positional arguments
for pos, argv in enumerate(args):
name = argdefs[pos][0]
mergargs[name] = argv
posnames.add(name)
# Merge in the rest from kwargs or the default values set at function definition
for name, defv in argdefs[len(args):]:
valu = kwargs.pop(name, s_common.novalu)
if valu is s_common.novalu:
if defv is s_common.novalu:
mesg = f'{self.name}() missing required argument {name}'
raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
valu = defv
mergargs[name] = valu
if kwargs:
# Repeated kwargs are caught at parse time, so query either repeated a positional parameter, or
# used a kwarg not defined.
kwkeys = list(kwargs.keys())
if kwkeys[0] in posnames:
mesg = f'{self.name}() got multiple values for parameter {kwkeys[0]}'
raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
plural = 's' if len(kwargs) > 1 else ''
mesg = f'{self.name}() got unexpected keyword argument{plural}: {",".join(kwkeys)}'
raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
assert len(mergargs) == len(argdefs)
opts = {'vars': mergargs}
if (self.hasretn and not self.hasemit):
async with runt.getSubRuntime(self.kids[2], opts=opts) as subr:
# inform the sub runtime to use function scope rules
subr.funcscope = True
try:
await asyncio.sleep(0)
async for item in subr.execute():
await asyncio.sleep(0)
return None
except s_stormctrl.StormReturn as e:
return e.item
async def genr():
async with runt.getSubRuntime(self.kids[2], opts=opts) as subr:
# inform the sub runtime to use function scope rules
subr.funcscope = True
try:
if self.hasemit:
await asyncio.sleep(0)
async with contextlib.aclosing(await subr.emitter()) as agen:
async for item in agen:
yield item
await asyncio.sleep(0)
else:
await asyncio.sleep(0)
async with contextlib.aclosing(subr.execute()) as agen:
async for node, path in agen:
yield node, path
except s_stormctrl.StormStop:
return
return genr()