Source code for synapse.lib.multislabseqn

from __future__ import annotations

import os
import heapq
import bisect
import shutil
import asyncio
import logging
import contextlib

import regex

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.base as s_base
import synapse.lib.coro as s_coro
import synapse.lib.slabseqn as s_slabseqn
import synapse.lib.lmdbslab as s_lmdbslab

from typing import List, Tuple, Dict, Optional, Any, AsyncIterator

logger = logging.getLogger(__name__)

seqnslabre = regex.compile(r'^seqn([0-9a-f]{16})\.lmdb$')

[docs]class MultiSlabSeqn(s_base.Base): ''' An append-optimized sequence of byte blobs stored across multiple slabs for fast rotating/culling ''' async def __anit__(self, # type: ignore dirn: str, opts: Optional[Dict] = None, slabopts: Optional[Dict] = None, cell=None): ''' Args: dirn (str): directory where to store the slabs opts (Optional[Dict]): options for this multislab slabopts (Optional[Dict]): options to pass through to the slab creation ''' await s_base.Base.__anit__(self) if opts is None: opts = {} self.offsevents: List[Tuple[int, int, asyncio.Event]] = [] # as a heap self._waitcounter = 0 self.cell = cell self.dirn: str = dirn s_common.gendir(self.dirn) self.slabopts: Dict[str, Any] = {} if slabopts is None else slabopts # The last/current slab self.tailslab: Optional[s_lmdbslab.Slab] = None self.tailseqn: Optional[s_slabseqn.SlabSeqn] = None # The most recently accessed slab/seqn that isn't the tail self._cacheslab: Optional[s_lmdbslab.Slab] = None self._cacheseqn: Optional[s_slabseqn.SlabSeqn] = None self._cacheridx: Optional[int] = None # A startidx -> (Slab, Seqn) dict for all open Slabs, so we don't accidentally open the same Slab twice self._openslabs: Dict[int, Tuple[s_lmdbslab.Slab, s_slabseqn.SlabSeqn]] = {} # Lock to avoid an open race self._openlock = asyncio.Lock() await self._discoverRanges() async def fini(): for slab, _ in list(self._openslabs.values()): # We incref the slabs, so might have to fini multiple times count = 1 while count: count = await slab.fini() self.onfini(fini) def __repr__(self): return f'MultiSlabSeqn: {self.dirn!r}' @staticmethod def _getFirstIndx(slab) -> Optional[int]: db = slab.initdb('info') bytz = slab.get(b'firstindx', db=db) if bytz is None: return 0 return s_common.int64un(bytz) @staticmethod def _setFirstIndx(slab, indx) -> bool: db = slab.initdb('info') return slab.put(b'firstindx', s_common.int64en(indx), db=db) async def _discoverRanges(self): ''' Go through the slabs and get the starting indices of the sequence in each slab ''' fnstartidx = 0 lastidx = None self._ranges: List[int] = [] # Starting offsets of all the slabs in order self.firstindx = 0 # persistently-stored indicator of lowest index self.indx = 0 # The next place an add() will go lowindx = None # Make sure the files are in order for fn in sorted(s_common.listdir(self.dirn, glob='*seqn' + '[abcdef01234567890]' * 16 + '.lmdb')): if not os.path.isdir(fn): logger.warning(f'Found a non-directory {fn} where a directory should be') continue match = seqnslabre.match(os.path.basename(fn)) assert match newstartidx = int(match.group(1), 16) assert newstartidx >= fnstartidx fnstartidx = newstartidx if lowindx is None: lowindx = fnstartidx if lastidx is not None: if fnstartidx <= lastidx: mesg = f'Multislab: overlapping files ({fn}). Previous last index is {lastidx}.' raise s_exc.BadCoreStore(mesg=mesg) if fnstartidx != lastidx + 1: logger.debug(f'Multislab: gap in indices at {fn}. Previous last index is {lastidx}.') async with await s_lmdbslab.Slab.anit(fn, **self.slabopts) as slab: self.firstindx = self._getFirstIndx(slab) # We use the old name of the sequence to ease migration from the old system seqn = slab.getSeqn('nexuslog') firstitem = seqn.first() if firstitem is None: self.indx = fnstartidx else: self.indx = seqn.indx firstidx = firstitem[0] # might not match the separately stored first index due to culling if firstidx < fnstartidx: raise s_exc.BadCoreStore('Multislab: filename inconsistent with contents') lastidx = seqn.index() - 1 self._ranges.append(fnstartidx) # An admin might have manually culled by rm'ing old slabs. Update firstidx accordingly. if lowindx is not None and lowindx > self.firstindx: self.firstindx = lowindx if self.firstindx > self.indx: raise s_exc.BadCoreStore('Invalid firstindx value') await self._initTailSlab(fnstartidx)
[docs] @staticmethod def slabFilename(dirn: str, indx: int): return s_common.genpath(dirn, f'seqn{indx:016x}.lmdb')
async def _initTailSlab(self, indx: int) -> int: if self.tailslab: await self.tailslab.fini() self.tailslab, self.tailseqn = await self._makeSlab(indx) if not self.tailslab.dbexists('info'): self._setFirstIndx(self.tailslab, self.firstindx) self.tailseqn.indx = indx self._ranges.append(indx) return indx def _wake_waiters(self) -> None: while self.offsevents and self.offsevents[0][0] < self.indx: _, _, evnt = heapq.heappop(self.offsevents) evnt.set()
[docs] async def rotate(self) -> int: ''' Rotate the Nexus log at the current index. Note: After this executes the tailseqn will be empty. Waiting for this indx to be written will indicate when it is possible to cull 1 minus the return value such that the rotated seqn is deleted. Returns: int: The starting index of the new seqn ''' assert self.tailslab and self.tailseqn and self._ranges if self.indx <= self._ranges[-1]: logger.info('Seqn %s at indx %d is empty', self.tailslab.path, self.indx) return self._ranges[-1] logger.info('Rotating %s at indx %d', self.tailslab.path, self.indx) return await self._initTailSlab(self.indx)
[docs] async def cull(self, offs: int) -> bool: ''' Remove entries up to (and including) the given offset. ''' logger.info('Culling %s at offs %d', self.dirn, offs) # Note: we don't bother deleting the rows from inside a partially culled slab. We just update self.firstindx # so nothing will return those rows anymore. We only delete from disk entire slabs once they are culled. if offs < self.firstindx: logger.warning('Unable to cull %s; offs (%d) < starting indx (%d)', self.dirn, offs, self.firstindx) return False # We keep at least one entry; this avoids offsets possibly going lower after a restart if offs >= self.indx - 1: logger.warning('Unable to cull %s at offs %d; must keep at least one entry', self.dirn, offs) return False if self._cacheridx is not None: self._cacheridx = None assert self._cacheslab await self._cacheslab.fini() self._cacheslab = self._cacheseqn = None del_ridx = None for ridx in range(len(self._ranges) - 1): startidx = self._ranges[ridx] if self._openslabs.get(startidx): raise s_exc.SlabInUse(mesg='Attempt to cull while another task is still using it') fn = self.slabFilename(self.dirn, startidx) if offs < self._ranges[ridx + 1] - 1: logger.warning('Log %s will not be deleted since offs is less than last indx', fn) break optspath = s_common.switchext(fn, ext='.opts.yaml') try: os.unlink(optspath) except FileNotFoundError: # pragma: no cover pass logger.info('Removing log %s with startidx %d', fn, startidx) shutil.rmtree(fn) del_ridx = ridx await asyncio.sleep(0) self.firstindx = offs + 1 self._setFirstIndx(self.tailslab, offs + 1) if del_ridx is not None: del self._ranges[:del_ridx + 1] # Log if there was an attempt to cull into the tailseqn if offs >= self._ranges[-1]: fn = self.tailslab.path logger.warning('Log %s will not be deleted since offs is in the currently active log', fn) return True
async def _makeSlab(self, startidx: int) -> Tuple[s_lmdbslab.Slab, s_slabseqn.SlabSeqn]: async with self._openlock: # Avoid race in two tasks making the same slab item = self._openslabs.get(startidx) if item is not None: item[0].incref() return item fn = self.slabFilename(self.dirn, startidx) slab = await s_lmdbslab.Slab.anit(fn, **self.slabopts) if self.cell is not None: slab.addResizeCallback(self.cell.checkFreeSpace) seqn = slab.getSeqn('nexuslog') self._openslabs[startidx] = slab, seqn def fini(): self._openslabs.pop(startidx, None) slab.onfini(fini) return slab, seqn @contextlib.asynccontextmanager async def _getSeqn(self, ridx: int) -> AsyncIterator[s_slabseqn.SlabSeqn]: ''' Get the sequence corresponding to an index into self._ranges ''' if ridx == len(self._ranges) - 1: assert self.tailslab and self.tailseqn slab, seqn = self.tailslab, self.tailseqn elif ridx == self._cacheridx: assert self._cacheslab and self._cacheseqn slab, seqn = self._cacheslab, self._cacheseqn else: startidx = self._ranges[ridx] self._cacheridx = None if self._cacheslab is not None: await self._cacheslab.fini() slab, seqn = self._cacheslab, self._cacheseqn = await self._makeSlab(startidx) self._cacheridx = ridx slab.incref() try: yield seqn finally: await slab.fini()
[docs] async def add(self, item: Any, indx=None) -> int: ''' Add a single item to the sequence. ''' advances = True if indx is not None: if indx < self.firstindx: raise s_exc.BadIndxValu(mesg=f'indx lower than first index in sequence {self.firstindx}') if indx < self._ranges[-1]: ridx = self._getRangeIndx(indx) assert ridx is not None async with self._getSeqn(ridx) as seqn: seqn.add(item, indx=indx) return indx if indx >= self.indx: self.indx = indx else: advances = False else: indx = self.indx assert self.tailseqn retn = self.tailseqn.add(item, indx=indx) if advances: self.indx += 1 self._wake_waiters() return retn
[docs] async def last(self) -> Optional[Tuple[int, Any]]: ridx = self._getRangeIndx(self.indx - 1) if ridx is None: return None async with self._getSeqn(ridx) as seqn: return seqn.last()
[docs] def index(self) -> int: ''' Return the current index to be used ''' return self.indx
[docs] def setIndex(self, indx: int) -> None: self.indx = indx
def _getRangeIndx(self, offs: int) -> Optional[int]: ''' Return the index into self._ranges that contains the offset ''' if offs < self.firstindx: return None indx = bisect.bisect_right(self._ranges, offs) assert indx return indx - 1
[docs] async def iter(self, offs: int) -> AsyncIterator[Tuple[int, Any]]: ''' Iterate over items in a sequence from a given offset. Args: offs (int): The offset to begin iterating from. Yields: (indx, valu): The index and valu of the item. ''' offs = max(offs, self.firstindx) ri = ridx = self._getRangeIndx(offs) assert ridx is not None # ranges could get appended while iterating due to a rotation while ri < len(self._ranges): if ri > ridx: offs = self._ranges[ri] async with self._getSeqn(ri) as seqn: for item in seqn.iter(offs): yield item ri += 1
[docs] async def gets(self, offs, wait=True) -> AsyncIterator[Tuple[int, Any]]: ''' Just like iter, but optionally waits for new entries once the end is reached. ''' while True: async for (indx, valu) in self.iter(offs): yield (indx, valu) offs = indx + 1 if not wait: return await self.waitForOffset(self.indx)
[docs] async def get(self, offs: int) -> Any: ''' Retrieve a single row by offset ''' ridx = self._getRangeIndx(offs) if ridx is None: raise s_exc.BadIndxValu(mesg=f'offs lower than first index {self.firstindx}') async with self._getSeqn(ridx) as seqn: return seqn.get(offs)
[docs] def getOffsetEvent(self, offs: int) -> asyncio.Event: ''' Returns an asyncio Event that will be set when the particular offset is written. The event will be set if the offset has already been reached. ''' evnt = asyncio.Event() if offs < self.indx: evnt.set() return evnt # We add a simple counter to the tuple to cause stable (and FIFO) sorting and prevent ties heapq.heappush(self.offsevents, (offs, self._waitcounter, evnt)) self._waitcounter += 1 return evnt
[docs] async def waitForOffset(self, offs: int, timeout=None) -> bool: ''' Returns: true if the event got set, False if timed out ''' if offs < self.indx: return True evnt = self.getOffsetEvent(offs) return await s_coro.event_wait(evnt, timeout=timeout)