import asyncio
import synapse.exc as s_exc
import synapse.lib.ast as s_ast
import synapse.lib.cache as s_cache
import synapse.lib.stormctrl as s_stormctrl
import synapse.lib.stormtypes as s_stormtypes
CACHE_SIZE_MAX = 10_000
CACHE_SIZE_DEFAULT = 10_000
[docs]
@s_stormtypes.registry.registerLib
class LibCache(s_stormtypes.Lib):
'''
A Storm Library for interacting with Cache Objects.
'''
_storm_locals = (
{'name': 'fixed', 'desc': '''
Get a new Fixed Cache object.
On a cache-miss when calling .get(), the callback Storm query is executed in a sub-runtime
in the current execution context. A special variable, $cache_key, will be set
to the key argument provided to .get().
The callback Storm query must contain a return statement, and if it does not return a value
when executed with the input, $lib.null will be set as the value.
The fixed cache uses FIFO to evict items once the maximum size is reached.
Examples:
// Use a callback query with a function that modifies the outer runtime,
// since it will run in the scope where it was defined.
$test = foo
function callback(key) {
$test = $key // this will modify $test in the outer runtime
return(`{$key}-val`)
}
$cache = $lib.cache.fixed(${ return($callback($cache_key)) })
$value = $cache.get(bar)
$lib.print($test) // this will equal "bar"
// Use a callback query that will not modify the outer runtime,
// except for variables accessible as references.
$test = foo
$tests = ([])
$cache = $lib.cache.fixed(${
$test = $cache_key // this will *not* modify $test in the outer runtime
$tests.append($cache_key) // this will modify $tests in the outer runtime
return(`{$cache_key}-val`)
})
$value = $cache.get(bar)
$lib.print($test) // this will equal "foo"
$lib.print($tests) // this will equal (foo,)
''',
'type': {'type': 'function', '_funcname': '_methFixedCache',
'args': (
{'name': 'callback', 'type': ['str', 'storm:query'],
'desc': 'A Storm query that will return a value for $cache_key on a cache miss.', },
{'name': 'size', 'type': 'int', 'default': CACHE_SIZE_DEFAULT,
'desc': 'The maximum size of the cache.', },
),
'returns': {'type': 'cache:fixed', 'desc': 'A new ``cache:fixed`` object.'}}},
)
_storm_lib_path = ('cache',)
[docs]
def getObjLocals(self):
return {
'fixed': self._methFixedCache,
}
@s_stormtypes.stormfunc(readonly=True)
async def _methFixedCache(self, callback, size=CACHE_SIZE_DEFAULT):
size = await s_stormtypes.toint(size)
callback = await s_stormtypes.tostr(callback)
if size < 1 or size > CACHE_SIZE_MAX:
raise s_exc.BadArg(mesg=f'Cache size must be between 1-{CACHE_SIZE_MAX}')
try:
query = await self.runt.getStormQuery(callback)
except s_exc.BadSyntax as e:
raise s_exc.BadArg(mesg=f'Invalid callback query: {e.errinfo.get("mesg")}')
if not query.hasAstClass(s_ast.Return):
raise s_exc.BadArg(mesg='Callback query must return a value')
return FixedCache(self.runt, query, size=size)
[docs]
@s_stormtypes.registry.registerType
class FixedCache(s_stormtypes.StormType):
'''
A StormLib API instance of a Storm Fixed Cache.
'''
_storm_locals = (
{'name': 'query', 'desc': 'Get the callback Storm query as string.',
'type': {'type': 'gtor', '_gtorfunc': '_gtorQuery',
'returns': {'type': 'str', 'desc': 'The callback Storm query text.', }}},
{'name': 'get', 'desc': 'Get an item from the cache by key.',
'type': {'type': 'function', '_funcname': '_methGet',
'args': (
{'name': 'key', 'type': 'any', 'desc': 'The key to lookup.'},
),
'returns': {'type': 'any',
'desc': 'The value from the cache, or the callback query if it does not exist', }}},
{'name': 'pop', 'desc': 'Pop an item from the cache.',
'type': {'type': 'function', '_funcname': '_methPop',
'args': (
{'name': 'key', 'type': 'any', 'desc': 'The key to pop.'},
),
'returns': {'type': 'any',
'desc': 'The value from the cache, or $lib.null if it does not exist', }}},
{'name': 'put', 'desc': 'Put an item into the cache.',
'type': {'type': 'function', '_funcname': '_methPut',
'args': (
{'name': 'key', 'type': 'any', 'desc': 'The key put in the cache.'},
{'name': 'value', 'type': 'any', 'desc': 'The value to assign to the key.'},
),
'returns': {'type': 'null', }}},
{'name': 'clear', 'desc': 'Clear all items from the cache.',
'type': {'type': 'function', '_funcname': '_methClear',
'returns': {'type': 'null', }}},
)
_storm_typename = 'cache:fixed'
_ismutable = False
def __init__(self, runt, query, size=CACHE_SIZE_DEFAULT):
s_stormtypes.StormType.__init__(self)
self.runt = runt
self.size = size
self.query = query
self.locls.update(self.getObjLocals())
self.gtors.update({
'query': self._gtorQuery,
})
self.cache = s_cache.FixedCache(self._runCallback, size=size)
def __len__(self):
return len(self.cache)
[docs]
async def stormrepr(self):
if len(qtext := self.query.text) > 100:
qtext = qtext[:100] + '...'
return f'{self._storm_typename}: size={self.size} query="{qtext}"'
[docs]
def getObjLocals(self):
return {
'pop': self._methPop,
'put': self._methPut,
'get': self._methGet,
'clear': self._methClear,
}
async def _gtorQuery(self):
return self.query.text
async def _runCallback(self, key):
varz = self.runt.getScopeVars()
varz['cache_key'] = key
opts = {'vars': varz}
async with self.runt.getCmdRuntime(self.query, opts=opts) as runt:
try:
async for _ in runt.execute():
await asyncio.sleep(0)
except s_stormctrl.StormReturn as e:
return await s_stormtypes.toprim(e.item)
except s_stormctrl.StormCtrlFlow as e:
name = e.__class__.__name__
if hasattr(e, 'statement'):
name = e.statement
exc = s_exc.StormRuntimeError(mesg=f'Storm control flow "{name}" not allowed in cache callbacks.')
raise exc from None
async def _reqKey(self, key):
if s_stormtypes.ismutable(key):
mesg = 'Mutable values are not allowed as cache keys'
raise s_exc.BadArg(mesg=mesg, name=await s_stormtypes.torepr(key))
return await s_stormtypes.toprim(key)
@s_stormtypes.stormfunc(readonly=True)
async def _methPop(self, key):
key = await self._reqKey(key)
return self.cache.pop(key)
@s_stormtypes.stormfunc(readonly=True)
async def _methPut(self, key, value):
key = await self._reqKey(key)
val = await s_stormtypes.toprim(value)
self.cache.put(key, val)
@s_stormtypes.stormfunc(readonly=True)
async def _methGet(self, key):
key = await self._reqKey(key)
return await self.cache.aget(key)
@s_stormtypes.stormfunc(readonly=True)
async def _methClear(self):
self.cache.clear()