Source code for synapse.lib.stormlib.notifications

import synapse.exc as s_exc
import synapse.lib.stormtypes as s_stormtypes

[docs]@s_stormtypes.registry.registerLib class NotifyLib(s_stormtypes.Lib): '''A Storm library for a user interacting with their notifications.''' _storm_lib_path = ('notifications', ) _storm_locals = ( { 'name': 'list', 'desc': ''' Yield (<indx>, <mesg>) tuples for a user's notifications. ''', 'type': { 'type': 'function', '_funcname': 'list', 'args': ( {'name': 'size', 'type': 'int', 'desc': 'The max number of notifications to yield.', 'default': None}, ), 'returns': { 'name': 'Yields', 'type': 'list', 'desc': 'Yields (useriden, time, mesgtype, msgdata) tuples.'}, }, }, { 'name': 'del', 'desc': ''' Delete a previously delivered notification. ''', 'type': { 'type': 'function', '_funcname': '_del', 'args': ( {'name': 'indx', 'type': 'int', 'desc': 'The index number of the notification to delete.'}, ), 'returns': { 'name': 'retn', 'type': 'list', 'desc': 'Returns an ($ok, $valu) tuple.'}, }, }, { 'name': 'get', 'desc': ''' Return a notification by ID (or $lib.null). ''', 'type': { 'type': 'function', '_funcname': 'get', 'args': ( {'name': 'indx', 'type': 'int', 'desc': 'The index number of the notification to return.'}, ), 'returns': { 'name': 'retn', 'type': 'dict', 'desc': 'The requested notification or $lib.null.'}, }, }, )
[docs] def getObjLocals(self): return { 'get': self.get, 'del': self._del, 'list': self.list, # 'bytime': # 'bytype': }
[docs] @s_stormtypes.stormfunc(readonly=True) async def get(self, indx): indx = await s_stormtypes.toint(indx) mesg = await self.runt.snap.core.getUserNotif(indx) if mesg[0] != self.runt.user.iden and not self.runt.isAdmin(): mesg = 'You may only get notifications which belong to you.' raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name) return mesg
async def _del(self, indx): indx = await s_stormtypes.toint(indx) mesg = await self.runt.snap.core.getUserNotif(indx) if mesg[0] != self.runt.user.iden and not self.runt.isAdmin(): mesg = 'You may only delete notifications which belong to you.' raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name) await self.runt.snap.core.delUserNotif(indx)
[docs] @s_stormtypes.stormfunc(readonly=True) async def list(self, size=None): size = await s_stormtypes.toint(size, noneok=True) async for mesg in self.runt.snap.core.iterUserNotifs(self.runt.user.iden, size=size): yield mesg