'''
An API to assist with the creation and enforcement of cortex data models.
'''
import asyncio
import logging
import collections
import regex
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.coro as s_coro
import synapse.lib.cache as s_cache
import synapse.lib.types as s_types
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.grammar as s_grammar
logger = logging.getLogger(__name__)
hexre = regex.compile('^[0-9a-z]+$')
PREFIX_CACHE_SIZE = 1000
[docs]
class TagProp:
def __init__(self, model, name, tdef, info):
self.name = name
self.info = info
self.tdef = tdef
self.model = model
self.locked = False
self.utf8 = name.encode()
self.nenc = name.encode() + b'\x00'
self.base = self.model.types.get(tdef[0])
if self.base is None:
raise s_exc.NoSuchType(name=tdef[0])
self.type = self.base.clone(tdef[1])
if isinstance(self.type, s_types.Array):
mesg = 'Tag props may not be array types (yet).'
raise s_exc.BadPropDef(mesg=mesg)
[docs]
def pack(self):
return {
'name': self.name,
'info': self.info,
'type': self.tdef,
'stortype': self.type.stortype,
}
[docs]
def getTagPropDef(self):
return (self.name, self.tdef, self.info)
[docs]
def getStorNode(self, form):
ndef = (form.name, form.type.norm(self.name)[0])
buid = s_common.buid(ndef)
props = {
'doc': self.info.get('doc', ''),
'type': self.type.name,
}
pnorms = {}
for prop, valu in props.items():
formprop = form.props.get(prop)
if formprop is not None and valu is not None:
pnorms[prop] = formprop.type.norm(valu)[0]
return (buid, {
'ndef': ndef,
'props': pnorms
})
[docs]
class Prop:
'''
The Prop class represents a property defined within the data model.
'''
def __init__(self, modl, form, name, typedef, info):
self.onsets = []
self.ondels = []
self.modl = modl
self.name = name
self.info = info
self.univ = None
if form is not None:
if name.startswith('.'):
self.univ = modl.prop(name)
self.full = '%s%s' % (form.name, name)
self.isext = name.startswith('._')
else:
self.full = '%s:%s' % (form.name, name)
self.isext = name.startswith('_')
self.isuniv = False
self.isrunt = form.isrunt
self.compoffs = form.type.getCompOffs(self.name)
else:
self.full = name
self.isuniv = True
self.isrunt = False
self.compoffs = None
self.isext = name.startswith('._')
self.isform = False # for quick Prop()/Form() detection
self.delperms = [('node', 'prop', 'del', self.full)]
self.setperms = [('node', 'prop', 'set', self.full)]
if form is not None:
self.setperms.append(('node', 'prop', 'set', form.name, self.name))
self.delperms.append(('node', 'prop', 'del', form.name, self.name))
self.setperms.reverse() # Make them in precedence order
self.delperms.reverse() # Make them in precedence order
self.form = form
self.type = None
self.typedef = typedef
self.alts = None
self.locked = False
self.deprecated = self.info.get('deprecated', False)
self.type = self.modl.getTypeClone(typedef)
self.typehash = self.type.typehash
if self.type.isarray:
self.arraytypehash = self.type.arraytype.typehash
if form is not None:
form.setProp(name, self)
self.modl.propsbytype[self.type.name][self.full] = self
if self.deprecated or self.type.deprecated:
async def depfunc(node, oldv):
mesg = f'The property {self.full} is deprecated or using a deprecated type and will be removed in 3.0.0'
await node.snap.warnonce(mesg)
self.onSet(depfunc)
def __repr__(self):
return f'DataModel Prop: {self.full}'
[docs]
def onSet(self, func):
'''
Add a callback for setting this property.
The callback is executed after the property is set.
Args:
func (function): A prop set callback.
The callback is called within the current transaction,
with the node, and the old property value (or None).
def func(node, oldv):
dostuff()
'''
self.onsets.append(func)
[docs]
def onDel(self, func):
'''
Add a callback for deleting this property.
The callback is executed after the property is deleted.
Args:
func (function): A prop del callback.
The callback is called within the current transaction,
with the node, and the old property value (or None).
def func(node, oldv):
dostuff()
'''
self.ondels.append(func)
[docs]
async def wasSet(self, node, oldv):
'''
Fire the onset() handlers for this property.
Args:
node (synapse.lib.node.Node): The node whose property was set.
oldv (obj): The previous value of the property.
'''
for func in self.onsets:
try:
await s_coro.ornot(func, node, oldv)
except asyncio.CancelledError:
raise
except Exception:
logger.exception('onset() error for %s' % (self.full,))
[docs]
async def wasDel(self, node, oldv):
for func in self.ondels:
try:
await s_coro.ornot(func, node, oldv)
except asyncio.CancelledError:
raise
except Exception:
logger.exception('ondel() error for %s' % (self.full,))
[docs]
def getCompOffs(self):
'''
Return the offset of this field within the compound primary prop or None.
'''
return self.compoffs
[docs]
def pack(self):
info = {
'name': self.name,
'full': self.full,
'type': self.typedef,
'stortype': self.type.stortype,
}
info.update(self.info)
return info
[docs]
def getPropDef(self):
return (self.name, self.typedef, self.info)
[docs]
def getStorNode(self, form):
ndef = (form.name, form.type.norm(self.full)[0])
buid = s_common.buid(ndef)
props = {
'doc': self.info.get('doc', ''),
'type': self.type.name,
'relname': self.name,
'univ': self.isuniv,
'base': self.name.split(':')[-1],
'ro': int(self.info.get('ro', False)),
'extmodel': self.isext,
}
if self.form is not None:
props['form'] = self.form.name
pnorms = {}
for prop, valu in props.items():
formprop = form.props.get(prop)
if formprop is not None and valu is not None:
pnorms[prop] = formprop.type.norm(valu)[0]
return (buid, {'props': pnorms, 'ndef': ndef})
[docs]
def getAlts(self):
'''
Return a list of Prop instances that are considered
alternative locations for our property value, including
self.
'''
if self.alts is None:
self.alts = [self]
for name in self.info.get('alts', ()):
self.alts.append(self.form.reqProp(name))
return self.alts
[docs]
class Edge:
def __init__(self, modl, edgetype, edgeinfo):
self.modl = modl
self.edgetype = edgetype
self.edgeinfo = edgeinfo
[docs]
def pack(self):
return (self.edgetype, self.edgeinfo)
[docs]
class Model:
'''
The data model used by a Cortex hypergraph.
'''
def __init__(self, core=None):
self.core = core
self.types = {} # name: Type()
self.forms = {} # name: Form()
self.props = {} # (form,name): Prop() and full: Prop()
self.edges = {} # (n1form, verb, n2form): Edge()
self.ifaces = {} # name: <ifdef>
self.tagprops = {} # name: TagProp()
self.formabbr = {} # name: [Form(), ... ]
self.modeldefs = []
self.univs = {}
self.allunivs = collections.defaultdict(list)
self.propsbytype = collections.defaultdict(dict) # name: Prop()
self.arraysbytype = collections.defaultdict(dict)
self.ifaceprops = collections.defaultdict(list)
self.formsbyiface = collections.defaultdict(list)
self.edgesbyn1 = collections.defaultdict(set)
self.edgesbyn2 = collections.defaultdict(set)
self.formprefixcache = s_cache.LruDict(PREFIX_CACHE_SIZE)
self._type_pends = collections.defaultdict(list)
self._modeldef = {
'ctors': [],
'types': [],
'forms': [],
'univs': [],
'edges': [],
}
# add the primitive base types
info = {'doc': 'The base 64 bit signed integer type.'}
item = s_types.Int(self, 'int', info, {})
self.addBaseType(item)
info = {'doc': 'The base floating point type.'}
item = s_types.Float(self, 'float', info, {})
self.addBaseType(item)
info = {'doc': 'A base range type.'}
item = s_types.Range(self, 'range', info, {'type': ('int', {})})
self.addBaseType(item)
info = {'doc': 'The base string type.'}
item = s_types.Str(self, 'str', info, {})
self.addBaseType(item)
info = {'doc': 'The base hex type.'}
item = s_types.Hex(self, 'hex', info, {})
self.addBaseType(item)
info = {'doc': 'The base boolean type.'}
item = s_types.Bool(self, 'bool', info, {})
self.addBaseType(item)
info = {'doc': 'A date/time value.'}
item = s_types.Time(self, 'time', info, {})
self.addBaseType(item)
info = {'doc': 'A duration value.'}
item = s_types.Duration(self, 'duration', info, {})
self.addBaseType(item)
info = {'doc': 'A time window/interval.'}
item = s_types.Ival(self, 'ival', info, {})
self.addBaseType(item)
info = {'doc': 'The base GUID type.'}
item = s_types.Guid(self, 'guid', info, {})
self.addBaseType(item)
info = {'doc': 'A tag component string.'}
item = s_types.TagPart(self, 'syn:tag:part', info, {})
self.addBaseType(item)
info = {'doc': 'The base type for a synapse tag.'}
item = s_types.Tag(self, 'syn:tag', info, {})
self.addBaseType(item)
info = {'doc': 'The base type for compound node fields.'}
item = s_types.Comp(self, 'comp', info, {})
self.addBaseType(item)
info = {'doc': 'The base geo political location type.'}
item = s_types.Loc(self, 'loc', info, {})
self.addBaseType(item)
info = {'doc': 'The node definition type for a (form,valu) compound field.'}
item = s_types.Ndef(self, 'ndef', info, {})
self.addBaseType(item)
info = {'doc': 'A typed array which indexes each field.'}
item = s_types.Array(self, 'array', info, {'type': 'int'})
self.addBaseType(item)
info = {'doc': 'An digraph edge base type.'}
item = s_types.Edge(self, 'edge', info, {})
self.addBaseType(item)
info = {'doc': 'An digraph edge base type with a unique time.'}
item = s_types.TimeEdge(self, 'timeedge', info, {})
self.addBaseType(item)
info = {'doc': 'Arbitrary json compatible data.'}
item = s_types.Data(self, 'data', info, {})
self.addBaseType(item)
info = {'doc': 'The nodeprop type for a (prop,valu) compound field.'}
item = s_types.NodeProp(self, 'nodeprop', info, {})
self.addBaseType(item)
info = {'doc': 'A potentially huge/tiny number. [x] <= 730750818665451459101842 with a fractional '
'precision of 24 decimal digits.'}
item = s_types.HugeNum(self, 'hugenum', info, {})
self.addBaseType(item)
info = {'doc': 'A component of a hierarchical taxonomy.'}
item = s_types.Taxon(self, 'taxon', info, {})
self.addBaseType(item)
info = {'doc': 'A hierarchical taxonomy.'}
item = s_types.Taxonomy(self, 'taxonomy', info, {})
self.addBaseType(item)
info = {'doc': 'A velocity with base units in mm/sec.'}
item = s_types.Velocity(self, 'velocity', info, {})
self.addBaseType(item)
# add the base universal properties...
self.addUnivProp('seen', ('ival', {}), {
'doc': 'The time interval for first/last observation of the node.',
})
self.addUnivProp('created', ('time', {'ismin': True}), {
'ro': True,
'doc': 'The time the node was created in the cortex.',
})
[docs]
def getPropsByType(self, name):
props = self.propsbytype.get(name)
if props is None:
return ()
# TODO order props based on score...
return list(props.values())
[docs]
def getArrayPropsByType(self, name):
props = self.arraysbytype.get(name)
if props is None:
return ()
return list(props.values())
[docs]
def getProps(self):
return [pobj for pname, pobj in self.props.items()
if not (isinstance(pname, tuple))]
[docs]
def reqProp(self, name, extra=None):
prop = self.prop(name)
if prop is not None:
return prop
exc = s_exc.NoSuchProp.init(name)
if extra is not None:
exc = extra(exc)
raise exc
[docs]
def reqUniv(self, name):
prop = self.univ(name)
if prop is not None:
return prop
mesg = f'No universal property named {name}.'
raise s_exc.NoSuchUniv(mesg=mesg, name=name)
[docs]
def reqTagProp(self, name):
prop = self.getTagProp(name)
if prop is not None:
return prop
mesg = f'No tag property named {name}.'
raise s_exc.NoSuchTagProp(mesg=mesg, name=name)
[docs]
def reqPropsByLook(self, name, extra=None):
if (forms := self.formsbyiface.get(name)) is not None:
return forms
if (props := self.ifaceprops.get(name)) is not None:
return props
if name.endswith('*'):
return self.reqFormsByPrefix(name[:-1], extra=extra)
exc = s_exc.NoSuchProp.init(name)
if extra is not None:
exc = extra(exc)
raise exc
[docs]
def getTypeClone(self, typedef):
base = self.types.get(typedef[0])
if base is None:
raise s_exc.NoSuchType(name=typedef[0])
return base.clone(typedef[1])
[docs]
def getModelDefs(self):
'''
Returns:
A list of one model definition compatible with addDataModels that represents the current data model
'''
mdef = self._modeldef.copy()
# dynamically generate form defs due to extended props
mdef['forms'] = [f.getFormDef() for f in self.forms.values()]
mdef['univs'] = [u.getPropDef() for u in self.univs.values()]
mdef['tagprops'] = [t.getTagPropDef() for t in self.tagprops.values()]
mdef['interfaces'] = list(self.ifaces.items())
mdef['edges'] = [e.pack() for e in self.edges.values()]
return [('all', mdef)]
[docs]
def getModelDict(self):
retn = {
'types': {},
'forms': {},
'edges': [],
'univs': {},
'tagprops': {},
'interfaces': self.ifaces.copy()
}
for tobj in self.types.values():
retn['types'][tobj.name] = tobj.pack()
for fobj in self.forms.values():
retn['forms'][fobj.name] = fobj.pack()
for uobj in self.univs.values():
retn['univs'][uobj.name] = uobj.pack()
for pobj in self.tagprops.values():
retn['tagprops'][pobj.name] = pobj.pack()
for eobj in self.edges.values():
retn['edges'].append(eobj.pack())
return retn
[docs]
def addDataModels(self, mods):
'''
Add a list of (name, mdef) tuples.
A model definition (mdef) is structured as follows::
{
"ctors":(
('name', 'class.path.ctor', {}, {'doc': 'The foo thing.'}),
),
"types":(
('name', ('basetype', {typeopts}), {info}),
),
"forms":(
(formname, (typename, typeopts), {info}, (
(propname, (typename, typeopts), {info}),
)),
),
"univs":(
(propname, (typename, typeopts), {info}),
)
"tagprops":(
(tagpropname, (typename, typeopts), {info}),
)
"interfaces":(
(ifacename, {
'props': ((propname, (typename, typeopts), {info}),),
'doc': docstr,
'interfaces': (ifacename,)
}),
)
}
Args:
mods (list): The list of tuples.
Returns:
None
'''
self.modeldefs.extend(mods)
# load all the base type ctors in order...
for _, mdef in mods:
for name, ctor, opts, info in mdef.get('ctors', ()):
item = s_dyndeps.tryDynFunc(ctor, self, name, info, opts)
self.types[name] = item
self._modeldef['ctors'].append((name, ctor, opts, info))
# load all the types in order...
for _, mdef in mods:
custom = mdef.get('custom', False)
for typename, (basename, typeopts), typeinfo in mdef.get('types', ()):
typeinfo['custom'] = custom
self.addType(typename, basename, typeopts, typeinfo)
# load all the interfaces...
for _, mdef in mods:
for name, info in mdef.get('interfaces', ()):
self.addIface(name, info)
# Load all the universal properties
for _, mdef in mods:
for univname, typedef, univinfo in mdef.get('univs', ()):
univinfo['custom'] = custom
self.addUnivProp(univname, typedef, univinfo)
# Load all the tagprops
for _, mdef in mods:
for tpname, typedef, tpinfo in mdef.get('tagprops', ()):
self.addTagProp(tpname, typedef, tpinfo)
# now we can load all the forms...
for _, mdef in mods:
for formname, forminfo, propdefs in mdef.get('forms', ()):
self.addForm(formname, forminfo, propdefs, checks=False)
# now we can load edge definitions...
for _, mdef in mods:
for etype, einfo in mdef.get('edges', ()):
self.addEdge(etype, einfo)
# now we can check the forms display settings...
for form in self.forms.values():
self._checkFormDisplay(form)
[docs]
def addEdge(self, edgetype, edgeinfo):
n1form, verb, n2form = edgetype
if n1form is not None:
self._reqFormName(n1form)
if n2form is not None:
self._reqFormName(n2form)
if not isinstance(verb, str):
mesg = f'Edge definition verb must be a string: {edgetype}.'
raise s_exc.BadArg(mesg=mesg)
if self.edges.get(edgetype) is not None:
mesg = f'Duplicate edge declared: {edgetype}.'
raise s_exc.BadArg(mesg=mesg)
edge = Edge(self, edgetype, edgeinfo)
self.edges[edgetype] = edge
self.edgesbyn1[n1form].add(edge)
self.edgesbyn2[n2form].add(edge)
[docs]
def delEdge(self, edgetype):
if self.edges.get(edgetype) is None:
return
n1form, verb, n2form = edgetype
self.edges.pop(edgetype, None)
self.edgesbyn1[n1form].discard(edgetype)
self.edgesbyn2[n2form].discard(edgetype)
def _reqFormName(self, name):
form = self.forms.get(name)
if form is None:
raise s_exc.NoSuchForm.init(name)
return form
[docs]
def addType(self, typename, basename, typeopts, typeinfo):
base = self.types.get(basename)
if base is None:
raise s_exc.NoSuchType(name=basename)
newtype = base.extend(typename, typeopts, typeinfo)
if newtype.deprecated and typeinfo.get('custom'):
mesg = f'The type {typename} is based on a deprecated type {newtype.name} which ' \
f'will be removed in 3.0.0.'
logger.warning(mesg)
self.types[typename] = newtype
self._modeldef['types'].append(newtype.getTypeDef())
def _checkFormDisplay(self, form):
formtype = self.types.get(form.full)
display = formtype.info.get('display')
if display is None:
return
for column in display.get('columns', ()):
coltype = column.get('type')
colopts = column.get('opts')
if coltype == 'prop':
curf = form
propname = colopts.get('name')
parts = propname.split('::')
for partname in parts:
prop = curf.prop(partname)
if prop is None:
mesg = (f'Form {form.name} defines prop column {propname}'
f' but {curf.full} has no property named {partname}.')
raise s_exc.BadFormDef(mesg=mesg)
curf = self.form(prop.type.name)
else:
mesg = f'Form {form.name} defines column with invalid type ({coltype}).'
raise s_exc.BadFormDef(mesg=mesg)
[docs]
def addIface(self, name, info):
# TODO should we add some meta-props here for queries?
self.ifaces[name] = info
[docs]
def delType(self, typename):
_type = self.types.get(typename)
if _type is None:
return
if self.propsbytype.get(typename):
mesg = f'Cannot delete type {typename} as it is still in use by properties.'
raise s_exc.CantDelType(mesg=mesg, name=typename)
for _type in self.types.values():
if typename in _type.info['bases']:
mesg = f'Cannot delete type {typename} as it is still in use by other types.'
raise s_exc.CantDelType(mesg=mesg, name=typename)
if _type.isarray and _type.arraytype.name == typename:
mesg = f'Cannot delete type {typename} as it is still in use by array types.'
raise s_exc.CantDelType(mesg=mesg, name=typename)
self.types.pop(typename, None)
self.propsbytype.pop(typename, None)
self.arraysbytype.pop(typename, None)
def _addFormUniv(self, form, name, tdef, info):
univ = self.reqUniv(name)
prop = Prop(self, form, name, tdef, info)
prop.locked = univ.locked
full = f'{form.name}{name}'
self.props[full] = prop
self.props[(form.name, name)] = prop
self.allunivs[name].append(prop)
[docs]
def addUnivProp(self, name, tdef, info):
base = '.' + name
univ = Prop(self, None, base, tdef, info)
if univ.type.deprecated:
mesg = f'The universal property {univ.full} is using a deprecated type {univ.type.name} which will' \
f' be removed in 3.0.0'
logger.warning(mesg)
self.props[base] = univ
self.univs[base] = univ
self.allunivs[base].append(univ)
for form in self.forms.values():
prop = self._addFormUniv(form, base, tdef, info)
[docs]
def getAllUnivs(self, name):
return list(self.allunivs.get(name, ()))
def _addFormProp(self, form, name, tdef, info):
prop = Prop(self, form, name, tdef, info)
# index the array item types
if isinstance(prop.type, s_types.Array):
self.arraysbytype[prop.type.arraytype.name][prop.full] = prop
self.props[prop.full] = prop
return prop
def _prepFormIface(self, form, iface):
template = iface.get('template', {})
template.update(form.type.info.get('template', {}))
def convert(item):
if isinstance(item, str):
if item == '$self':
return form.name
item = s_common.format(item, **template)
# warn but do not blow up. there may be extended model elements
# with {}s which are not used for templates...
if item.find('{') != -1: # pragma: no cover
logger.warning(f'Missing template specifier in: {item}')
return item
if isinstance(item, dict):
return {convert(k): convert(v) for (k, v) in item.items()}
if isinstance(item, (list, tuple)):
return tuple([convert(v) for v in item])
return item
return convert(iface)
def _addFormIface(self, form, name, subifaces=None):
iface = self.ifaces.get(name)
if iface is None:
mesg = f'Form {form.name} depends on non-existent interface: {name}'
raise s_exc.NoSuchName(mesg=mesg)
if iface.get('deprecated'):
mesg = f'Form {form.name} depends on deprecated interface {name} which will be removed in 3.0.0'
logger.warning(mesg)
iface = self._prepFormIface(form, iface)
for propname, typedef, propinfo in iface.get('props', ()):
# allow form props to take precedence
if form.prop(propname) is not None:
continue
prop = self._addFormProp(form, propname, typedef, propinfo)
self.ifaceprops[f'{name}:{propname}'].append(prop.full)
if subifaces is not None:
for subi in subifaces:
self.ifaceprops[f'{subi}:{propname}'].append(prop.full)
form.ifaces[name] = iface
self.formsbyiface[name].append(form.name)
if (ifaces := iface.get('interfaces')) is not None:
if subifaces is None:
subifaces = []
else:
subifaces = list(subifaces)
subifaces.append(name)
for ifname in ifaces:
self._addFormIface(form, ifname, subifaces=subifaces)
def _delFormIface(self, form, name, subifaces=None):
if (iface := self.ifaces.get(name)) is None:
return
iface = self._prepFormIface(form, iface)
for propname, typedef, propinfo in iface.get('props', ()):
fullprop = f'{form.name}:{propname}'
self.delFormProp(form.name, propname)
self.ifaceprops[f'{name}:{propname}'].remove(fullprop)
if subifaces is not None:
for subi in subifaces:
self.ifaceprops[f'{subi}:{propname}'].remove(fullprop)
form.ifaces.pop(name, None)
self.formsbyiface[name].remove(form.name)
if (ifaces := iface.get('interfaces')) is not None:
if subifaces is None:
subifaces = []
else:
subifaces = list(subifaces)
subifaces.append(name)
for ifname in ifaces:
self._delFormIface(form, ifname, subifaces=subifaces)
[docs]
def delTagProp(self, name):
return self.tagprops.pop(name)
[docs]
def addTagProp(self, name, tdef, info):
if name in self.tagprops:
raise s_exc.DupTagPropName(mesg=name)
prop = TagProp(self, name, tdef, info)
self.tagprops[name] = prop
if prop.type.deprecated:
mesg = f'The tag property {prop.name} is using a deprecated type {prop.type.name} which will' \
f' be removed in 3.0.0'
logger.warning(mesg)
return prop
[docs]
def getTagProp(self, name):
return self.tagprops.get(name)
[docs]
def delUnivProp(self, propname):
univname = '.' + propname
univ = self.props.pop(univname, None)
if univ is None:
raise s_exc.NoSuchUniv(name=propname)
self.univs.pop(univname, None)
self.allunivs.pop(univname, None)
for form in self.forms.values():
self.delFormProp(form.name, univname)
[docs]
def addBaseType(self, item):
'''
Add a Type instance to the data model.
'''
ctor = '.'.join([item.__class__.__module__, item.__class__.__qualname__])
self._modeldef['ctors'].append(((item.name, ctor, dict(item.opts), dict(item.info))))
self.types[item.name] = item
[docs]
def type(self, name):
'''
Return a synapse.lib.types.Type by name.
'''
return self.types.get(name)
[docs]
def prop(self, name):
return self.props.get(name)
[docs]
def univ(self, name):
return self.univs.get(name)
[docs]
def tagprop(self, name):
return self.tagprops.get(name)
[docs]
def edge(self, edgetype):
return self.edges.get(edgetype)