import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.cache as s_cache
import synapse.lib.stormtypes as s_stormtypes
stormcmds = [
{
'name': 'model.edge.set',
'descr': 'Set a key-value for an edge verb that exists in the current view.',
'cmdargs': (
('verb', {'help': 'The edge verb to add a key to.'}),
('key', {'help': 'The key name (e.g. doc).'}),
('valu', {'help': 'The string value to set.'}),
),
'storm': '''
$verb = $cmdopts.verb
$key = $cmdopts.key
$lib.model.edge.set($verb, $key, $cmdopts.valu)
$lib.print('Set edge key: verb={verb} key={key}', verb=$verb, key=$key)
''',
},
{
'name': 'model.edge.get',
'descr': 'Retrieve key-value pairs for an edge verb in the current view.',
'cmdargs': (
('verb', {'help': 'The edge verb to retrieve.'}),
),
'storm': '''
$verb = $cmdopts.verb
$kvpairs = $lib.model.edge.get($verb)
if $kvpairs {
$lib.print('verb = {verb}', verb=$verb)
for ($key, $valu) in $kvpairs {
$lib.print(' {key} = {valu}', key=$key, valu=$valu)
}
} else {
$lib.print('verb={verb} contains no key-value pairs.', verb=$verb)
}
''',
},
{
'name': 'model.edge.del',
'descr': 'Delete a global key-value pair for an edge verb in the current view.',
'cmdargs': (
('verb', {'help': 'The edge verb to delete documentation for.'}),
('key', {'help': 'The key name (e.g. doc).'}),
),
'storm': '''
$verb = $cmdopts.verb
$key = $cmdopts.key
$lib.model.edge.del($verb, $key)
$lib.print('Deleted edge key: verb={verb} key={key}', verb=$verb, key=$key)
''',
},
{
'name': 'model.edge.list',
'descr': 'List all edge verbs in the current view and their doc key (if set).',
'storm': '''
$edgelist = $lib.model.edge.list()
if $edgelist {
$lib.print('\nname doc')
$lib.print('---- ---')
for ($verb, $kvdict) in $edgelist {
$verb = $verb.ljust(10)
$doc = $kvdict.doc
if ($doc=$lib.null) { $doc = '' }
$lib.print('{verb} {doc}', verb=$verb, doc=$doc)
}
$lib.print('')
} else {
$lib.print('No edge verbs found in the current view.')
}
''',
},
{
'name': 'model.deprecated.lock',
'descr': 'Edit lock status of deprecated model elements.',
'cmdargs': (
('name', {'help': 'The deprecated form or property name to lock or * to lock all.'}),
('--unlock', {'help': 'Unlock rather than lock the deprecated property.', 'default': False, 'action': 'store_true'}),
),
'storm': '''
init {
if $cmdopts.unlock {
$lib.print("Unlocking: {name}", name=$cmdopts.name)
$lib.model.deprecated.lock($cmdopts.name, $lib.false)
} else {
if ($cmdopts.name = "*") {
$lib.print("Locking all deprecated model elements.")
for ($name, $locked) in $lib.model.deprecated.locks() {
if (not $locked) { $lib.model.deprecated.lock($name, $lib.true) }
}
} else {
$lib.print("Locking: {name}", name=$cmdopts.name)
$lib.model.deprecated.lock($cmdopts.name, $lib.true)
}
}
}
''',
},
{
'name': 'model.deprecated.locks',
'descr': 'Display lock status of deprecated model elements.',
'storm': '''
$locks = $lib.model.deprecated.locks()
if $locks {
$lib.print("Lock status for deprecated forms/props:")
for ($name, $locked) in $lib.sorted($locks) {
$lib.print("{name}: {locked}", name=$name, locked=$locked)
}
} else {
$lib.print("No deprecated locks found.")
}
''',
},
{
'name': 'model.deprecated.check',
'descr': 'Check for lock status and the existence of deprecated model elements',
'storm': '''
init {
$ok = $lib.true
$lib.print("Checking the cortex for model flag day readiness...")
$locks = $lib.model.deprecated.locks()
$lib.print("Checking deprecated model locks:")
for ($name, $locked) in $locks {
if $locked {
$lib.print("{name} is locked", name=$name)
} else {
$lib.warn("{name} is not yet locked", name=$name)
$ok = $lib.false
}
}
$lib.print("Checking for existence of deprecated model elements:")
for ($name, $locked) in $locks {
$lib.print("{name}...", name=$name)
for $layr in $lib.layer.list() {
if $layr.getPropCount($name, maxsize=1) {
$lib.warn("Layer {iden} still contains {name}", iden=$layr.iden, name=$name)
$ok = $lib.false
}
}
}
if (not $ok) {
$lib.print("Your cortex contains deprecated model elements.")
} else {
$lib.print("Congrats! Your Cortex is fully future-model compliant!")
}
}
''',
},
]
[docs]@s_stormtypes.registry.registerLib
class LibModel(s_stormtypes.Lib):
'''
A Storm Library for interacting with the Data Model in the Cortex.
'''
_storm_lib_path = ('model',)
_storm_locals = (
{'name': 'type', 'desc': 'Get a type object by name.',
'type': {'type': 'function', '_funcname': '_methType',
'args': (
{'name': 'name', 'type': 'str', 'desc': 'The name of the type to retrieve.', },
),
'returns': {'type': ['model:type', 'null'],
'desc': 'The ``model:type`` instance if the type if present on the form or null.',
}}},
{'name': 'prop', 'desc': 'Get a prop object by name.',
'type': {'type': 'function', '_funcname': '_methProp',
'args': (
{'name': 'name', 'type': 'str', 'desc': 'The name of the prop to retrieve.', },
),
'returns': {'type': ['model:property', 'null'],
'desc': 'The ``model:property`` instance if the type if present or null.',
}}},
{'name': 'form', 'desc': 'Get a form object by name.',
'type': {'type': 'function', '_funcname': '_methForm',
'args': (
{'name': 'name', 'type': 'str', 'desc': 'The name of the form to retrieve.', },
),
'returns': {'type': ['model:form', 'null'],
'desc': 'The ``model:form`` instance if the form is present or null.',
}}},
{'name': 'tagprop', 'desc': 'Get a tag property object by name.',
'type': {'type': 'function', '_funcname': '_methTagProp',
'args': (
{'name': 'name', 'type': 'str', 'desc': 'The name of the tag prop to retrieve.', },
),
'returns': {'type': ['model:tagprop', 'null'],
'desc': 'The ``model:tagprop`` instance if the tag prop if present or null.',
}}},
)
def __init__(self, runt, name=()):
s_stormtypes.Lib.__init__(self, runt, name)
self.model = runt.model
[docs] def getObjLocals(self):
return {
'type': self._methType,
'prop': self._methProp,
'form': self._methForm,
'tagprop': self._methTagProp,
}
@s_cache.memoizemethod(size=100)
@s_stormtypes.stormfunc(readonly=True)
async def _methType(self, name):
name = await s_stormtypes.tostr(name)
type_ = self.model.type(name)
if type_ is not None:
return ModelType(type_)
@s_cache.memoizemethod(size=100)
@s_stormtypes.stormfunc(readonly=True)
async def _methProp(self, name):
name = await s_stormtypes.tostr(name)
prop = self.model.prop(name)
if prop is not None:
return ModelProp(prop)
@s_cache.memoizemethod(size=100)
@s_stormtypes.stormfunc(readonly=True)
async def _methForm(self, name):
name = await s_stormtypes.tostr(name)
form = self.model.form(name)
if form is not None:
return ModelForm(form)
@s_cache.memoize(size=100)
@s_stormtypes.stormfunc(readonly=True)
async def _methTagProp(self, name):
name = await s_stormtypes.tostr(name)
tagprop = self.model.getTagProp(name)
if tagprop is not None:
return ModelTagProp(tagprop)
[docs]@s_stormtypes.registry.registerType
class ModelProp(s_stormtypes.Prim):
'''
Implements the Storm API for a Property.
'''
_storm_locals = (
{'name': 'name', 'desc': 'The short name of the Property.', 'type': 'str', },
{'name': 'full', 'desc': 'The full name of the Property.', 'type': 'str', },
{'name': 'form', 'desc': 'Get the Form for the Property.',
'type': {'type': 'ctor', '_ctorfunc': '_ctorPropForm',
'returns': {'type': ['model:form', 'null']}}},
{'name': 'type', 'desc': 'Get the Type for the Property.',
'type': {'type': 'ctor', '_ctorfunc': '_ctorPropType',
'returns': {'type': 'model:type'}}},
)
_storm_typename = 'model:property'
def __init__(self, prop, path=None):
s_stormtypes.Prim.__init__(self, prop, path=path)
self.ctors.update({
'form': self._ctorPropForm,
'type': self._ctorPropType,
})
self.locls['name'] = self.valu.name
self.locls['full'] = self.valu.full
def _ctorPropType(self, path=None):
return ModelType(self.valu.type, path=path)
def _ctorPropForm(self, path=None):
if self.valu.form is None:
return None
return ModelForm(self.valu.form, path=path)
[docs] def value(self):
return self.valu.pack()
[docs]@s_stormtypes.registry.registerType
class ModelTagProp(s_stormtypes.Prim):
'''
Implements the Storm API for a Tag Property.
'''
_storm_locals = (
{'name': 'name', 'desc': 'The name of the Tag Property.', 'type': 'str', },
{'name': 'type', 'desc': 'Get the Type for the Tag Property.',
'type': {'type': 'ctor', '_ctorfunc': '_ctorTagPropType',
'returns': {'type': 'model:type'}}},
)
_storm_typename = 'model:tagprop'
def __init__(self, tagprop, path=None):
s_stormtypes.Prim.__init__(self, tagprop, path=path)
self.ctors.update({
'type': self._ctorTagPropType,
})
self.locls['name'] = self.valu.name
def _ctorTagPropType(self, path=None):
return ModelType(self.valu.type, path=path)
[docs] def value(self):
return self.valu.pack()
[docs]@s_stormtypes.registry.registerType
class ModelType(s_stormtypes.Prim):
'''
A Storm types wrapper around a lib.types.Type
'''
_storm_locals = (
{'name': 'name', 'desc': 'The name of the Type.', 'type': 'str', },
{'name': 'stortype', 'desc': 'The storetype of the Type.', 'type': 'int', },
{'name': 'repr', 'desc': 'Get the repr of a value for the Type.',
'type': {'type': 'function', '_funcname': '_methRepr',
'args': (
{'name': 'valu', 'desc': 'The value to get the repr of.', 'type': 'any', },
),
'returns': {'desc': 'The string form of the value as represented by the type.', 'type': 'str', }}},
{'name': 'norm', 'desc': 'Get the norm and info for the Type.',
'type': {'type': 'function', '_funcname': '_methNorm',
'args': (
{'name': 'valu', 'desc': 'The value to norm.', 'type': 'any', },
),
'returns': {'desc': 'A tuple of the normed value and its information dictionary.', 'type': 'list'}}},
)
_storm_typename = 'model:type'
def __init__(self, valu, path=None):
s_stormtypes.Prim.__init__(self, valu, path=path)
self.locls.update(self.getObjLocals())
self.locls.update({'name': valu.name,
'stortype': valu.stortype,
})
[docs] def getObjLocals(self):
return {
'norm': self._methNorm,
'repr': self._methRepr,
}
@s_stormtypes.stormfunc(readonly=True)
async def _methRepr(self, valu):
nval = self.valu.norm(valu)
return self.valu.repr(nval[0])
@s_stormtypes.stormfunc(readonly=True)
async def _methNorm(self, valu):
return self.valu.norm(valu)
[docs] def value(self):
return self.valu.getTypeDef()
[docs]@s_stormtypes.registry.registerLib
class LibModelEdge(s_stormtypes.Lib):
'''
A Storm Library for interacting with light edges and manipulating their key-value attributes.
'''
_storm_locals = (
{'name': 'get', 'desc': 'Get the key-value data for a given Edge verb.',
'type': {'type': 'function', '_funcname': '_methEdgeGet',
'args': (
{'name': 'verb', 'desc': 'The Edge verb to look up.', 'type': 'str', },
),
'returns': {'type': 'dict', 'desc': 'A dictionary representing the key-value data set on a verb.', }}},
{'name': 'validkeys', 'desc': 'Get a list of the valid keys that can be set on an Edge verb.',
'type': {'type': 'function', '_funcname': '_methValidKeys',
'returns': {'type': 'list', 'desc': 'A list of the valid keys.', }
}
},
{'name': 'set', 'desc': 'Set a key-value for a given Edge verb.',
'type': {'type': 'function', '_funcname': '_methEdgeSet',
'args': (
{'name': 'verb', 'type': 'str', 'desc': 'The Edge verb to set a value for.', },
{'name': 'key', 'type': 'str', 'desc': 'The key to set.', },
{'name': 'valu', 'type': 'str', 'desc': 'The value to set.', },
),
'returns': {'type': 'null', }}},
{'name': 'del', 'desc': 'Delete a key from the key-value store for a verb.',
'type': {'type': 'function', '_funcname': '_methEdgeDel',
'args': (
{'name': 'verb', 'type': 'str', 'desc': 'The name of the Edge verb to remove a key from.', },
{'name': 'key', 'type': 'str', 'desc': 'The name of the key to remove from the key-value store.', },
),
'returns': {'type': 'null', }}},
{'name': 'list', 'desc': 'Get a list of (verb, key-value dictionary) pairs for Edge verbs in the current Cortex View.',
'type': {'type': 'function', '_funcname': '_methEdgeList',
'returns': {'type': 'list', 'desc': 'A list of (str, dict) tuples for each verb in the current Cortex View.', }}},
)
# Note: The use of extprops in hive paths in this class is an artifact of the
# original implementation which used extended property language which had a
# very bad cognitive overload with the cortex extended properties, but we
# don't want to change underlying data. epiphyte 20200703
# restrict list of keys which we allow to be set/del through this API.
validedgekeys = (
'doc',
)
hivepath = ('cortex', 'model', 'edges')
_storm_lib_path = ('model', 'edge')
def __init__(self, runt, name=()):
s_stormtypes.Lib.__init__(self, runt, name)
[docs] def getObjLocals(self):
return {
'get': self._methEdgeGet,
'set': self._methEdgeSet,
'del': self._methEdgeDel,
'list': self._methEdgeList,
'validkeys': self._methValidKeys,
}
async def _chkEdgeVerbInView(self, verb):
async for vverb in self.runt.snap.view.getEdgeVerbs():
if vverb == verb:
return
raise s_exc.NoSuchName(mesg=f'No such edge verb in the current view', name=verb)
async def _chkKeyName(self, key):
if key not in self.validedgekeys:
raise s_exc.NoSuchProp(mesg=f'The requested key is not valid for light edge metadata.',
name=key)
@s_stormtypes.stormfunc(readonly=True)
def _methValidKeys(self):
s_common.deprecated('model.edge.validkeys', curv='2.165.0')
return self.validedgekeys
@s_stormtypes.stormfunc(readonly=True)
async def _methEdgeGet(self, verb):
s_common.deprecated('model.edge.get', curv='2.165.0')
verb = await s_stormtypes.tostr(verb)
await self._chkEdgeVerbInView(verb)
path = self.hivepath + (verb, 'extprops')
return await self.runt.snap.core.getHiveKey(path) or {}
async def _methEdgeSet(self, verb, key, valu):
s_common.deprecated('model.edge.set', curv='2.165.0')
verb = await s_stormtypes.tostr(verb)
await self._chkEdgeVerbInView(verb)
key = await s_stormtypes.tostr(key)
await self._chkKeyName(key)
valu = await s_stormtypes.tostr(valu)
path = self.hivepath + (verb, 'extprops')
kvdict = await self.runt.snap.core.getHiveKey(path) or {}
kvdict[key] = valu
await self.runt.snap.core.setHiveKey(path, kvdict)
async def _methEdgeDel(self, verb, key):
s_common.deprecated('model.edge.del', curv='2.165.0')
verb = await s_stormtypes.tostr(verb)
await self._chkEdgeVerbInView(verb)
key = await s_stormtypes.tostr(key)
await self._chkKeyName(key)
path = self.hivepath + (verb, 'extprops')
kvdict = await self.runt.snap.core.getHiveKey(path) or {}
oldv = kvdict.pop(key, None)
if oldv is None:
raise s_exc.NoSuchProp(mesg=f'Key is not set for this edge verb',
verb=verb, name=key)
await self.runt.snap.core.setHiveKey(path, kvdict)
@s_stormtypes.stormfunc(readonly=True)
async def _methEdgeList(self):
s_common.deprecated('model.edge.list', curv='2.165.0')
retn = []
async for verb in self.runt.snap.view.getEdgeVerbs():
path = self.hivepath + (verb, 'extprops')
kvdict = await self.runt.snap.core.getHiveKey(path) or {}
retn.append((verb, kvdict))
return retn
[docs]@s_stormtypes.registry.registerLib
class LibModelDeprecated(s_stormtypes.Lib):
'''
A storm library for interacting with the model deprecation mechanism.
'''
_storm_locals = (
{'name': 'lock', 'desc': 'Set the locked property for a deprecated model element.',
'type': {'type': 'function', '_funcname': '_lock',
'args': (
{'name': 'name', 'desc': 'The full path of the model element to lock.', 'type': 'str', },
{'name': 'locked', 'desc': 'The lock status.', 'type': 'boolean', },
),
'returns': {'type': 'null', }}},
{'name': 'locks', 'desc': 'Get a dictionary of the data model elements which are deprecated and their lock status in the Cortex.',
'type': {'type': 'function', '_funcname': '_locks',
'returns': {'type': 'dict', 'desc': 'A dictionary of named elements to their boolean lock values.', }}},
)
_storm_lib_path = ('model', 'deprecated')
[docs] def getObjLocals(self):
return {
'lock': self._lock,
'locks': self._locks,
}
@s_stormtypes.stormfunc(readonly=True)
async def _locks(self):
todo = s_common.todo('getDeprLocks')
locks = await self.runt.dyncall('cortex', todo)
return s_stormtypes.Dict(locks)
async def _lock(self, name, locked):
name = await s_stormtypes.tostr(name)
locked = await s_stormtypes.tobool(locked)
todo = s_common.todo('setDeprLock', name, locked)
gatekeys = ((self.runt.user.iden, ('model', 'deprecated', 'lock'), None),)
await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)