Source code for synapse.lib.jsonstor

import os
import asyncio
import logging

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.cell as s_cell
import synapse.lib.base as s_base
import synapse.lib.nexus as s_nexus
import synapse.lib.msgpack as s_msgpack
import synapse.lib.lmdbslab as s_lmdbslab

logger = logging.getLogger(__name__)

[docs]class JsonStor(s_base.Base): ''' A filesystem like storage mechanism that allows hirarchical lookup of reference counted "objects" that have individually editable properties. #TODO json validation by path glob matches? (persists?) #TODO GUID ACCESS with index generation by type #TODO registered types jsonschema with optional write-back validation ''' async def __anit__(self, slab, pref): await s_base.Base.__anit__(self) self.slab = slab self.pref = pref self.dirty = {} self.pathdb = self.slab.initdb(f'{pref}:paths') self.itemdb = self.slab.initdb(f'{pref}:items') self.metadb = self.slab.initdb(f'{pref}:meta') self.fsinfo = self.slab.initdb(f'{pref}:fsinfo') self.slab.on('commit', self._syncDirtyItems) async def _syncDirtyItems(self, mesg): todo = list(self.dirty.items()) for buid, item in todo: self.slab.put(buid, s_msgpack.en(item), db=self.itemdb) self.dirty.pop(buid, None) await asyncio.sleep(0) def _incRefObj(self, buid, valu=1): refs = 0 refsbyts = self.slab.get(buid + b'refs', db=self.metadb) if refsbyts: refs = s_msgpack.un(refsbyts) refs += valu if refs > 0: self.slab.put(buid + b'refs', s_msgpack.en(refs), db=self.metadb) return refs # remove the meta entries for lkey, byts in self.slab.scanByPref(buid, db=self.metadb): self.slab.pop(lkey, db=self.metadb) # remove the item data self.slab.pop(buid, db=self.itemdb) self.dirty.pop(buid, None)
[docs] async def copyPathObj(self, oldp, newp): item = await self.getPathObj(oldp) await self.setPathObj(newp, item)
[docs] async def copyPathObjs(self, paths): for oldp, newp in paths: await self.copyPathObj(oldp, newp) await asyncio.sleep(0)
[docs] async def setPathObj(self, path, item): ''' Set (and/or reinitialize) the object at the given path. NOTE: This will break any links by creating a new object. ''' buid = os.urandom(16) pkey = self._pathToPkey(path) oldb = self.slab.replace(pkey, buid, db=self.pathdb) if oldb is not None: self._incRefObj(oldb, -1) self.slab.put(buid + b'refs', s_msgpack.en(1), db=self.metadb) self.dirty[buid] = item
[docs] async def getPathObj(self, path): buid = self._pathToBuid(path) if buid is None: return None return self._getBuidItem(buid)
def _getBuidItem(self, buid): item = self.dirty.get(buid) if item is not None: return item byts = self.slab.get(buid, db=self.itemdb) if byts is not None: return s_msgpack.un(byts) def _pathToBuid(self, path): pkey = self._pathToPkey(path) return self.slab.get(pkey, db=self.pathdb)
[docs] async def hasPathObj(self, path): pkey = self._pathToPkey(path) return self.slab.has(pkey, db=self.pathdb)
[docs] async def delPathObj(self, path): ''' Remove a path and decref the object it references. ''' pkey = self._pathToPkey(path) buid = self.slab.pop(pkey, db=self.pathdb) if buid is not None: self._incRefObj(buid, valu=-1)
[docs] async def getPathObjProp(self, path, prop): item = await self.getPathObj(path) if item is None: return None for name in self._pathToTupl(prop): item = item.get(name, s_common.novalu) if item is s_common.novalu: return None return item
def _pathToPkey(self, path): path = self._pathToTupl(path) return ('\x00'.join(path)).encode() def _pathToTupl(self, path): if isinstance(path, str): path = tuple(path.split('/')) return path def _tuplToPath(self, path): return '/'.join(path) def _pkeyToTupl(self, pkey): return tuple(pkey.decode().split('\x00'))
[docs] async def getPathList(self, path): path = self._pathToTupl(path) plen = len(path) pkey = self._pathToPkey(path) for lkey, buid in self.slab.scanByPref(pkey, db=self.pathdb): yield self._tuplToPath(self._pkeyToTupl(lkey)[plen:])
[docs] async def getPathObjs(self, path): path = self._pathToTupl(path) plen = len(path) pkey = self._pathToPkey(path) for lkey, buid in self.slab.scanByPref(pkey, db=self.pathdb): yield self._pkeyToTupl(lkey)[plen:], self._getBuidItem(buid)
[docs] async def setPathObjProp(self, path, prop, valu): buid = self._pathToBuid(path) if buid is None: return False item = self._getBuidItem(buid) if item is None: return False step = item names = self._pathToTupl(prop) for name in names[:-1]: down = step.get(name) if down is None: down = step[name] = {} step = down name = names[-1] if step.get(name, s_common.novalu) == valu: return True step[name] = valu self.dirty[buid] = item return True
[docs] async def delPathObjProp(self, path, prop): buid = self._pathToBuid(path) if buid is None: return False item = self._getBuidItem(buid) if item is None: return False step = item names = self._pathToTupl(prop) for name in names[:-1]: step = step[name] step.pop(names[-1], None) self.dirty[buid] = item return True
[docs] async def cmpDelPathObjProp(self, path, prop, valu): buid = self._pathToBuid(path) if buid is None: return False item = self._getBuidItem(buid) if item is None: return False step = item names = self._pathToTupl(prop) for name in names[:-1]: step = step[name] name = names[-1] if step.get(name) != valu: return False step.pop(name, None) self.dirty[buid] = item return True
[docs] async def popPathObjProp(self, path, prop, defv=None): buid = self._pathToBuid(path) if buid is None: return defv item = self._getBuidItem(buid) if item is None: return defv step = item names = self._pathToTupl(prop) for name in names[:-1]: step = step.get(name, s_common.novalu) if step is s_common.novalu: return defv retn = step.pop(names[-1], defv) self.dirty[buid] = item return retn
[docs]class JsonStorApi(s_cell.CellApi):
[docs] async def popPathObjProp(self, path, prop): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'set', *path)) return await self.cell.popPathObjProp(path, prop)
[docs] async def hasPathObj(self, path): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'get', *path)) return await self.cell.hasPathObj(path)
[docs] async def copyPathObj(self, oldp, newp): oldp = self.cell.jsonstor._pathToTupl(oldp) newp = self.cell.jsonstor._pathToTupl(newp) await self._reqUserAllowed(('json', 'get', *oldp)) await self._reqUserAllowed(('json', 'set', *newp)) return await self.cell.copyPathObj(oldp, newp)
[docs] async def copyPathObjs(self, paths): pathnorms = [] for oldp, newp in paths: oldp = self.cell.jsonstor._pathToTupl(oldp) newp = self.cell.jsonstor._pathToTupl(newp) await self._reqUserAllowed(('json', 'get', *oldp)) await self._reqUserAllowed(('json', 'set', *newp)) pathnorms.append((oldp, newp)) return await self.cell.copyPathObjs(pathnorms)
[docs] async def getPathList(self, path): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'list', *path)) async for item in self.cell.getPathList(path): yield item
[docs] async def getPathObj(self, path): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'get', *path)) return await self.cell.getPathObj(path)
[docs] async def getPathObjs(self, path): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'get', *path)) async for item in self.cell.getPathObjs(path): yield item
[docs] async def setPathObj(self, path, item): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'set', *path)) return await self.cell.setPathObj(path, item)
[docs] async def delPathObj(self, path): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'del', *path)) return await self.cell.delPathObj(path)
[docs] async def delPathObjProp(self, path, name): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'set', *path)) return await self.cell.delPathObjProp(path, name)
[docs] async def cmpDelPathObjProp(self, path, name, valu): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'set', *path)) return await self.cell.cmpDelPathObjProp(path, name, valu)
[docs] async def getPathObjProp(self, path, prop): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'get', *path)) return await self.cell.getPathObjProp(path, prop)
[docs] async def setPathObjProp(self, path, prop, valu): path = self.cell.jsonstor._pathToTupl(path) await self._reqUserAllowed(('json', 'set', *path)) return await self.cell.setPathObjProp(path, prop, valu)
[docs] async def addQueue(self, name, info): await self._reqUserAllowed(('queue', 'add', name)) info['owner'] = self.user.iden info['created'] = s_common.now() return await self.cell.addQueue(name, info)
[docs] async def delQueue(self, name): await self._reqUserAllowed(('queue', 'del', name)) return await self.cell.delQueue(name)
[docs] async def cullQueue(self, name, offs): await self._reqUserAllowed(('queue', 'gets', name)) return await self.cell.cullQueue(name, offs)
[docs] async def putsQueue(self, name, items): await self._reqUserAllowed(('queue', 'puts', name)) return await self.cell.putsQueue(name, items)
[docs] async def getsQueue(self, name, offs, size=None, cull=True, wait=True): await self._reqUserAllowed(('queue', 'gets', name)) async for item in self.cell.getsQueue(name, offs, size=size, cull=cull, wait=wait): yield item
[docs] @s_cell.adminapi() async def addUserNotif(self, useriden, mesgtype, mesgdata=None): return await self.cell.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)
[docs] @s_cell.adminapi() async def getUserNotif(self, indx): return await self.cell.getUserNotif(indx)
[docs] @s_cell.adminapi() async def delUserNotif(self, indx): return await self.cell.delUserNotif(indx)
[docs] @s_cell.adminapi() async def iterUserNotifs(self, useriden, size=None): async for item in self.cell.iterUserNotifs(useriden, size=size): yield item
[docs] @s_cell.adminapi() async def watchAllUserNotifs(self, offs=None): async for item in self.cell.watchAllUserNotifs(offs=offs): yield item
[docs]class JsonStorCell(s_cell.Cell): cellapi = JsonStorApi
[docs] async def initServiceStorage(self): self.jsonstor = await JsonStor.anit(self.slab, 'jsonstor') self.multique = await s_lmdbslab.MultiQueue.anit(self.slab, 'multique') self.onfini(self.jsonstor.fini) self.onfini(self.multique.fini) self.notif_abrv_user = self.slab.getNameAbrv('users') self.notif_abrv_type = self.slab.getNameAbrv('types') self.notifseqn = self.slab.getSeqn('notifs') self.notif_indx_usertime = self.slab.initdb('indx:user:time', dupsort=True) self.notif_indx_usertype = self.slab.initdb('indx:user:type', dupsort=True)
[docs] @classmethod def getEnvPrefix(cls): return (f'SYN_JSONSTOR', f'SYN_{cls.__name__.upper()}', )
[docs] async def getPathList(self, path): async for item in self.jsonstor.getPathList(path): yield item
[docs] @s_nexus.Pusher.onPushAuto('json:pop:prop') async def popPathObjProp(self, path, prop): return await self.jsonstor.popPathObjProp(path, prop)
[docs] async def hasPathObj(self, path): return await self.jsonstor.hasPathObj(path)
[docs] @s_nexus.Pusher.onPushAuto('json:copy') async def copyPathObj(self, oldp, newp): return await self.jsonstor.copyPathObj(oldp, newp)
[docs] @s_nexus.Pusher.onPushAuto('json:copys') async def copyPathObjs(self, paths): return await self.jsonstor.copyPathObjs(paths)
[docs] async def getPathObj(self, path): return await self.jsonstor.getPathObj(path)
[docs] async def getPathObjs(self, path): async for item in self.jsonstor.getPathObjs(path): yield item await asyncio.sleep(0)
[docs] async def getPathObjProp(self, path, prop): return await self.jsonstor.getPathObjProp(path, prop)
[docs] @s_nexus.Pusher.onPushAuto('json:set') async def setPathObj(self, path, item): return await self.jsonstor.setPathObj(path, item)
[docs] @s_nexus.Pusher.onPushAuto('json:del') async def delPathObj(self, path): return await self.jsonstor.delPathObj(path)
[docs] @s_nexus.Pusher.onPushAuto('json:del:prop') async def delPathObjProp(self, path, name): return await self.jsonstor.delPathObjProp(path, name)
[docs] @s_nexus.Pusher.onPushAuto('json:cmp:del:prop') async def cmpDelPathObjProp(self, path, name, valu): return await self.jsonstor.cmpDelPathObjProp(path, name, valu)
[docs] @s_nexus.Pusher.onPushAuto('json:set:prop') async def setPathObjProp(self, path, prop, valu): return await self.jsonstor.setPathObjProp(path, prop, valu)
[docs] @s_nexus.Pusher.onPushAuto('q:add') async def addQueue(self, name, info): if not self.multique.exists(name): await self.multique.add(name, info) return True return False
[docs] @s_nexus.Pusher.onPushAuto('q:del') async def delQueue(self, name): if not self.multique.exists(name): return False await self.multique.rem(name) return True
[docs] async def putsQueue(self, name, items): return await self._push('q:puts', name, items)
@s_nexus.Pusher.onPush('q:puts', passitem=True) async def _putsQueue(self, name, items, nexsitem): nexsoff, nexsmesg = nexsitem return await self.multique.puts(name, items, reqid=nexsoff)
[docs] @s_nexus.Pusher.onPushAuto('q:cull') async def cullQueue(self, name, offs): return await self.multique.cull(name, offs)
[docs] async def getsQueue(self, name, offs, size=None, cull=True, wait=True): if cull and offs > 0: await self.cullQueue(name, offs - 1) async for item in self.multique.gets(name, offs, size=size, wait=wait): yield item
[docs] async def addUserNotif(self, useriden, mesgtype, mesgdata=None): mesg = (useriden, s_common.now(), mesgtype, mesgdata) return await self._push('notif:add', mesg)
[docs] async def getUserNotif(self, indx): return self.notifseqn.get(indx)
@s_nexus.Pusher.onPush('notif:add', passitem=True) async def _addUserNotif(self, mesg, nexsitem): indx = self.notifseqn.add(mesg, indx=nexsitem[0]) indxbyts = s_common.int64en(indx) useriden, mesgtime, mesgtype, mesgdata = mesg userbyts = s_common.uhex(useriden) timebyts = s_common.int64en(mesgtime) typeabrv = self.notif_abrv_type.setBytsToAbrv(mesgtype.encode()) self.slab.put(userbyts + timebyts, indxbyts, db=self.notif_indx_usertime, dupdata=True) self.slab.put(userbyts + typeabrv + timebyts, indxbyts, db=self.notif_indx_usertype, dupdata=True) return indx
[docs] @s_nexus.Pusher.onPushAuto('notif:del') async def delUserNotif(self, indx): envl = self.notifseqn.pop(indx) if envl is None: return mesg = envl[1] useriden, mesgtime, mesgtype, mesgdata = mesg indxbyts = s_common.int64en(indx) userbyts = s_common.uhex(useriden) timebyts = s_common.int64en(mesgtime) typeabrv = self.notif_abrv_type.setBytsToAbrv(mesgtype.encode()) self.slab.delete(userbyts + timebyts, indxbyts, db=self.notif_indx_usertime) self.slab.delete(userbyts + typeabrv + timebyts, indxbyts, db=self.notif_indx_usertype)
[docs] async def iterUserNotifs(self, useriden, size=None): # iterate user notifications backward userbyts = s_common.uhex(useriden) count = 0 for _, indxbyts in self.slab.scanByPrefBack(userbyts, db=self.notif_indx_usertype): indx = s_common.int64un(indxbyts) mesg = self.notifseqn.getraw(indxbyts) yield (indx, mesg) count += 1 if size is not None and count >= size: break
[docs] async def watchAllUserNotifs(self, offs=None): # yield only new notifications as they arrive if offs is None: offs = self.notifseqn.index() async for item in self.notifseqn.aiter(offs=offs, wait=True, timeout=120): yield item
# async def iterUserNotifsByTime(self, useriden, ival): # async def iterUserNotifsByType(self, useriden, mesgtype, ival=None):