Source code for synapse.lib.slabseqn

import heapq
import asyncio

import synapse.common as s_common

import synapse.lib.coro as s_coro
import synapse.lib.msgpack as s_msgpack

[docs] class SlabSeqn: ''' An append optimized sequence of byte blobs. Args: lenv (lmdb.Environment): The LMDB Environment. name (str): The name of the sequence. ''' def __init__(self, slab, name: str) -> None: self.slab = slab self.db = self.slab.initdb(name) self.indx = self.nextindx() self.addevents = [] self.offsevents = [] # type: ignore # List[Tuple[int, int, asyncio.Event]] as a heap self._waitcounter = 0 # NOTE: This is intended to be publicly accessible # and therefore must always represent the true size. self.size = self.stat()['entries'] def _wake_waiters(self): for evnt in self.addevents: evnt.set() while self.offsevents and self.offsevents[0][0] < self.indx: _, _, evnt = heapq.heappop(self.offsevents) evnt.set()
[docs] def pop(self, offs): ''' Pop a single entry at the given offset. ''' byts = self.slab.pop(s_common.int64en(offs), db=self.db) if byts is not None: self.size -= 1 return (offs, s_msgpack.un(byts))
[docs] async def cull(self, offs): ''' Remove entries up to (and including) the given offset. ''' for itemoffs, valu in self.iter(0): if itemoffs > offs: return if self.slab.delete(s_common.int64en(itemoffs), db=self.db): self.size -= 1 await asyncio.sleep(0)
[docs] def add(self, item, indx=None): ''' Add a single item to the sequence. ''' if indx is not None: if indx >= self.indx: self.slab.put(s_common.int64en(indx), s_msgpack.en(item), append=True, db=self.db) self.indx = indx + 1 self.size += 1 self._wake_waiters() return indx oldv = self.slab.replace(s_common.int64en(indx), s_msgpack.en(item), db=self.db) if oldv is None: self.size += 1 return indx indx = self.indx retn = self.slab.put(s_common.int64en(indx), s_msgpack.en(item), append=True, db=self.db) assert retn, "Not adding the largest index" self.indx += 1 self.size += 1 self._wake_waiters() return indx
[docs] def first(self): for lkey, lval in self.slab.scanByFull(db=self.db): return s_common.int64un(lkey), s_msgpack.un(lval) return None
[docs] def last(self): last = self.slab.last(db=self.db) if last is None: return None lkey, lval = last indx = s_common.int64un(lkey) return indx, s_msgpack.un(lval)
[docs] def stat(self): return self.slab.stat(db=self.db)
[docs] async def save(self, items): ''' Save a series of items to a sequence. Args: items (tuple): The series of items to save into the sequence. Returns: The index of the first item ''' rows = [] indx = self.indx size = 0 tick = s_common.now() abstick = s_common.mononow() for item in items: byts = s_msgpack.en(item) size += len(byts) lkey = s_common.int64en(indx) indx += 1 rows.append((lkey, byts)) retn = await self.slab.putmulti(rows, append=True, db=self.db) took = s_common.mononow() - abstick assert retn, "Not adding the largest indices" self.size += retn[1] origindx = self.indx self.indx = indx self._wake_waiters() return {'indx': indx, 'size': size, 'count': len(items), 'time': tick, 'took': took, 'orig': origindx}
[docs] def index(self): ''' Return the current index to be used ''' return self.indx
[docs] def nextindx(self): ''' Determine the next insert offset according to storage. Returns: int: The next insert offset. ''' byts = self.slab.lastkey(db=self.db) if byts is None: return 0 return s_common.int64un(byts) + 1
[docs] def iter(self, offs, reverse=False): ''' Iterate over items in a sequence from a given offset. Args: offs (int): The offset to begin iterating from. Yields: (indx, valu): The index and valu of the item. ''' startkey = s_common.int64en(offs) if reverse: for lkey, lval in self.slab.scanByRangeBack(startkey, db=self.db): offs = s_common.int64un(lkey) valu = s_msgpack.un(lval) yield offs, valu else: for lkey, lval in self.slab.scanByRange(startkey, db=self.db): offs = s_common.int64un(lkey) valu = s_msgpack.un(lval) yield offs, valu
[docs] async def aiter(self, offs, wait=False, timeout=None): ''' Iterate over items in a sequence from a given offset. Args: offs (int): The offset to begin iterating from. wait (boolean): Once caught up, yield new results in realtime. timeout (int): Max time to wait for a new item. Yields: (indx, valu): The index and valu of the item. ''' startkey = s_common.int64en(offs) scanoffs = None for lkey, lval in self.slab.scanByRange(startkey, db=self.db): scanoffs = s_common.int64un(lkey) valu = s_msgpack.un(lval) yield scanoffs, valu # no awaiting between here and evnt.timewait() if wait: if scanoffs is None: offs -= 1 else: offs = scanoffs evnt = s_coro.Event() try: self.addevents.append(evnt) while True: evnt.clear() if not await evnt.timewait(timeout=timeout): return startkey = s_common.int64en(offs + 1) for lkey, lval in self.slab.scanByRange(startkey, db=self.db): offs = s_common.int64un(lkey) valu = s_msgpack.un(lval) yield offs, valu finally: self.addevents.remove(evnt)
[docs] async def gets(self, offs, wait=True): ''' Returns an async generator of indx/valu tuples, optionally waiting and continuing to yield them as new entries are added Args: offs (int): The offset to begin iterating from. wait (bool): Whether to continue yielding tupls when it hits the end of the sequence. Yields: (indx, valu): The index and valu of the item. ''' while True: for (indx, valu) in self.iter(offs): yield (indx, valu) offs = indx + 1 if not wait: return await self.waitForOffset(self.indx)
[docs] def trim(self, offs): ''' Delete entries starting at offset and moving forward. ''' retn = False startkey = s_common.int64en(offs) for lkey, _ in self.slab.scanByRange(startkey, db=self.db): retn = True if self.slab.delete(lkey, db=self.db): self.size -= 1 if retn: self.indx = self.nextindx() return retn
[docs] def iterBack(self, offs): ''' Iterate backwards over items in a sequence from a given offset. Args: offs (int): The offset to begin iterating from. Yields: (indx, valu): The index and valu of the item. ''' startkey = s_common.int64en(offs) for lkey, lval in self.slab.scanByRangeBack(startkey, db=self.db): indx = s_common.int64un(lkey) valu = s_msgpack.un(lval) yield indx, valu
[docs] def rows(self, offs): ''' Iterate over raw indx, bytes tuples from a given offset. ''' lkey = s_common.int64en(offs) for lkey, byts in self.slab.scanByRange(lkey, db=self.db): indx = s_common.int64un(lkey) yield indx, byts
[docs] def get(self, offs): ''' Retrieve a single row by offset ''' lkey = s_common.int64en(offs) valu = self.slab.get(lkey, db=self.db) if valu is not None: return s_msgpack.un(valu)
[docs] def getraw(self, byts): valu = self.slab.get(byts, db=self.db) if valu is not None: return s_msgpack.un(valu)
[docs] def slice(self, offs, size): imax = size - 1 for i, item in enumerate(self.iter(offs)): yield item if i == imax: break
[docs] def sliceBack(self, offs, size): imax = size - 1 for i, item in enumerate(self.iterBack(offs)): yield item if i == imax: break
[docs] def getByIndxByts(self, indxbyts): byts = self.slab.get(indxbyts, db=self.db) if byts is not None: return s_msgpack.un(byts)
[docs] def getOffsetEvent(self, offs): ''' Returns an asyncio Event that will be set when the particular offset is written. The event will be set if the offset has already been reached. ''' evnt = asyncio.Event() if offs < self.indx: evnt.set() return evnt # We add a simple counter to the tuple to cause stable (and FIFO) sorting and prevent ties heapq.heappush(self.offsevents, (offs, self._waitcounter, evnt)) self._waitcounter += 1 return evnt
[docs] async def waitForOffset(self, offs, timeout=None): ''' Returns: true if the event got set, False if timed out ''' if offs < self.indx: return True evnt = self.getOffsetEvent(offs) return await s_coro.event_wait(evnt, timeout=timeout)