Source code for synapse.lib.spooled

import tempfile

import synapse.common as s_common

import synapse.lib.base as s_base
import synapse.lib.const as s_const
import synapse.lib.msgpack as s_msgpack
import synapse.lib.lmdbslab as s_lmdbslab

MAX_SPOOL_SIZE = 10000

[docs] class Spooled(s_base.Base): ''' A Base class that can be used to implement objects which fallback to lmdb. These objects are intended to fallback from Python to lmbd slabs, which aligns them together. Under memory pressure, these objects have a better shot of getting paged out. ''' async def __anit__(self, dirn=None, size=MAX_SPOOL_SIZE, cell=None): ''' Args: dirn(Optional[str]): base directory used for backing slab. If None, system temporary directory is used size(int): maximum number of items stored in RAM before spooled to disk ''' await s_base.Base.__anit__(self) self.cell = cell self.size = size self.dirn = dirn self.slab = None self.fallback = False async def fini(): if self.slab is not None: await self.slab.trash() self.onfini(fini) async def _initFallBack(self): self.fallback = True dirn = self.dirn if dirn is not None: # Consolidate the spooled slabs underneath 'tmp' to make it easy for backup tool to avoid copying dirn = s_common.gendir(self.dirn, 'tmp') slabpath = tempfile.mkdtemp(dir=dirn, prefix='spooled_', suffix='.lmdb') self.slab = await s_lmdbslab.Slab.anit(slabpath, map_size=s_const.mebibyte * 32) if self.cell is not None: self.slab.addResizeCallback(self.cell.checkFreeSpace)
[docs] class Set(Spooled): ''' A minimal set-like implementation that will spool to a slab on large growth. ''' async def __anit__(self, dirn=None, size=MAX_SPOOL_SIZE, cell=None): await Spooled.__anit__(self, dirn=dirn, size=size, cell=cell) self.realset = set() self.len = 0 async def __aiter__(self): if not self.fallback: for item in self.realset: yield item return for byts in self.slab.scanKeys(): yield s_msgpack.un(byts) def __contains__(self, valu): if self.fallback: return self.slab.get(s_msgpack.en(valu)) is not None return valu in self.realset def __len__(self): ''' Returns how many items are in the set, regardless of whether in RAM or backed to slab ''' if self.fallback: return self.len return len(self.realset)
[docs] async def copy(self): newset = await Set.anit(dirn=self.dirn, size=self.size, cell=self.cell) if self.fallback: await newset._initFallBack() await self.slab.copydb(None, newset.slab) newset.len = self.len else: newset.realset = self.realset.copy() return newset
[docs] async def clear(self): if self.fallback: self.len = 0 await self.slab.trash() await self._initFallBack() else: self.realset.clear()
[docs] async def add(self, valu): if self.fallback: if self.slab.put(s_msgpack.en(valu), b'\x01', overwrite=False): self.len += 1 return self.realset.add(valu) if len(self.realset) >= self.size: await self._initFallBack() [self.slab.put(s_msgpack.en(valu), b'\x01') for valu in self.realset] self.len = len(self.realset) self.realset.clear()
[docs] def has(self, key): if self.fallback: return self.slab.has(s_msgpack.en(key)) return key in self.realset
[docs] def discard(self, valu): if self.fallback: ret = self.slab.pop(s_msgpack.en(valu)) if ret is None: return self.len -= 1 return self.realset.discard(valu)
[docs] class Dict(Spooled): async def __anit__(self, dirn=None, size=MAX_SPOOL_SIZE, cell=None): await Spooled.__anit__(self, dirn=dirn, size=size, cell=cell) self.realdict = {} self.len = 0 def __len__(self): if self.fallback: return self.len return len(self.realdict)
[docs] async def set(self, key, val): if self.fallback: if self.slab.replace(s_msgpack.en(key), s_msgpack.en(val)) is None: self.len += 1 return self.realdict[key] = val if len(self.realdict) >= self.size: await self._initFallBack() [self.slab.put(s_msgpack.en(k), s_msgpack.en(v)) for (k, v) in self.realdict.items()] self.len = len(self.realdict) self.realdict.clear()
[docs] def pop(self, key, defv=None): if self.fallback: ret = self.slab.pop(s_msgpack.en(key)) if ret is None: return defv self.len -= 1 return s_msgpack.un(ret) return self.realdict.pop(key, defv)
[docs] def has(self, key): if self.fallback: return self.slab.has(s_msgpack.en(key)) return key in self.realdict
[docs] def get(self, key, defv=None): if self.fallback: byts = self.slab.get(s_msgpack.en(key)) if byts is None: return defv return s_msgpack.un(byts) return self.realdict.get(key, defv)
[docs] def keys(self): if self.fallback: for lkey in self.slab.scanKeys(): yield s_msgpack.un(lkey) # avoid edit while iter issues... for key in list(self.realdict.keys()): yield key
[docs] def items(self): if self.fallback: for lkey, lval in self.slab.scanByFull(): yield s_msgpack.un(lkey), s_msgpack.un(lval) for item in list(self.realdict.items()): yield item