Source code for synapse.cryotank

import os
import shutil
import asyncio
import logging

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.base as s_base
import synapse.lib.cell as s_cell
import synapse.lib.storm as s_storm
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.lib.slabseqn as s_slabseqn
import synapse.lib.slaboffs as s_slaboffs

logger = logging.getLogger(__name__)

[docs] class TankApi(s_cell.CellApi):
[docs] async def slice(self, offs, size=None, wait=False, timeout=None): self.user.confirm(('cryo', 'tank', 'read'), gateiden=self.cell.iden()) async for item in self.cell.slice(offs, size=size, wait=wait, timeout=timeout): yield item
[docs] async def puts(self, items): self.user.confirm(('cryo', 'tank', 'put'), gateiden=self.cell.iden()) return await self.cell.puts(items)
[docs] async def metrics(self, offs, size=None): self.user.confirm(('cryo', 'tank', 'read'), gateiden=self.cell.iden()) async for item in self.cell.metrics(offs, size=size): yield item
[docs] async def iden(self): return self.cell.iden()
[docs] class CryoTank(s_base.Base): ''' A CryoTank implements a stream of structured data. ''' async def __anit__(self, dirn, iden, conf=None): await s_base.Base.__anit__(self) if conf is None: conf = {} self.conf = conf self.dirn = s_common.gendir(dirn) self._iden = iden path = s_common.gendir(self.dirn, 'tank.lmdb') self.slab = await s_lmdbslab.Slab.anit(path, map_async=True, **conf) self._items = s_slabseqn.SlabSeqn(self.slab, 'items') self._metrics = s_slabseqn.SlabSeqn(self.slab, 'metrics') self.onfini(self.slab.fini)
[docs] def iden(self): return self._iden
[docs] def last(self): ''' Return an (offset, item) tuple for the last element in the tank ( or None ). ''' return self._items.last()
[docs] async def puts(self, items): ''' Add the structured data from items to the CryoTank. Args: items (list): A list of objects to store in the CryoTank. Returns: int: The ending offset of the items or seqn. ''' size = 0 for chunk in s_common.chunks(items, 1000): metrics = await self._items.save(chunk) self._metrics.add(metrics) await self.fire('cryotank:puts', numrecords=len(chunk)) size += len(chunk) await asyncio.sleep(0) return size
[docs] async def metrics(self, offs, size=None): ''' Yield metrics rows starting at offset. Args: offs (int): The index offset. size (int): The maximum number of records to yield. Yields: ((int, dict)): An index offset, info tuple for metrics. ''' for i, (indx, item) in enumerate(self._metrics.iter(offs)): if size is not None and i >= size: return yield indx, item
[docs] async def slice(self, offs, size=None, wait=False, timeout=None): ''' Yield a number of items from the CryoTank starting at a given offset. Args: offs (int): The index of the desired datum (starts at 0) size (int): The max number of items to yield. wait (bool): Once caught up, yield new results in realtime timeout (int): Max time to wait for a new item. Yields: ((index, object)): Index and item values. ''' i = 0 async for indx, item in self._items.aiter(offs, wait=wait, timeout=timeout): if size is not None and i >= size: return yield indx, item i += 1 await asyncio.sleep(0)
[docs] async def rows(self, offs, size=None): ''' Yield a number of raw items from the CryoTank starting at a given offset. Args: offs (int): The index of the desired datum (starts at 0) size (int): The max number of items to yield. Yields: ((indx, bytes)): Index and msgpacked bytes. ''' for i, (indx, byts) in enumerate(self._items.rows(offs)): if size is not None and i >= size: return yield indx, byts
[docs] async def info(self): ''' Returns information about the CryoTank instance. Returns: dict: A dict containing items and metrics indexes. ''' stat = self._items.stat() return { 'iden': self._iden, 'indx': self._items.index(), 'metrics': self._metrics.index(), 'stat': stat, }
[docs] class CryoApi(s_cell.CellApi): ''' The CryoCell API as seen by a telepath proxy. This is the API to reference for remote CryoCell use. '''
[docs] async def init(self, name, conf=None): tank = await self.cell.init(name, conf=conf, user=self.user) return tank.iden()
[docs] async def slice(self, name, offs, size=None, wait=False, timeout=None): tank = await self.cell.init(name, user=self.user) self.user.confirm(('cryo', 'tank', 'read'), gateiden=tank.iden()) async for item in tank.slice(offs, size=size, wait=wait, timeout=timeout): yield item
[docs] async def list(self): return await self.cell.list(user=self.user)
[docs] async def last(self, name): tank = await self.cell.init(name, user=self.user) self.user.confirm(('cryo', 'tank', 'read'), gateiden=tank.iden()) return tank.last()
[docs] async def puts(self, name, items): tank = await self.cell.init(name, user=self.user) self.user.confirm(('cryo', 'tank', 'put'), gateiden=tank.iden()) return await tank.puts(items)
[docs] async def rows(self, name, offs, size): tank = await self.cell.init(name, user=self.user) self.user.confirm(('cryo', 'tank', 'read'), gateiden=tank.iden()) async for item in tank.rows(offs, size): yield item
[docs] async def metrics(self, name, offs, size=None): tank = await self.cell.init(name, user=self.user) self.user.confirm(('cryo', 'tank', 'read'), gateiden=tank.iden()) async for item in tank.metrics(offs, size=size): yield item
[docs] @s_cell.adminapi(log=True) async def delete(self, name): return await self.cell.delete(name)
[docs] class CryoCell(s_cell.Cell): cellapi = CryoApi tankapi = TankApi async def __anit__(self, dirn, conf=None, readonly=False): await s_cell.Cell.__anit__(self, dirn, conf) await self.auth.addAuthGate('cryo', 'cryo') self._cryo_permdefs = [] self._initCryoPerms() self.dmon.share('cryotank', self)
[docs] async def initServiceStorage(self): self.names = self.slab.getSafeKeyVal('cryo:names') await self._bumpCellVers('cryotank', ( (2, self._migrateToV2), (3, self._migrateToV3), ), nexs=False) self.tanks = await s_base.BaseRef.anit() self.onfini(self.tanks.fini) for name, (iden, conf) in self.names.items(): logger.info('Bringing tank [%s][%s] online', name, iden) path = s_common.genpath(self.dirn, 'tanks', iden) tank = await CryoTank.anit(path, iden, conf) self.tanks.put(name, tank) await self.auth.addAuthGate(iden, 'tank')
async def _migrateToV2(self): logger.warning('Beginning migration to V2') async with await self.hive.open(('cryo', 'names')) as names: for name, node in names: iden, conf = node.valu if conf is None: conf = {} logger.info(f'Migrating tank {name=} {iden=}') path = s_common.genpath(self.dirn, 'tanks', iden) # remove old guid file guidpath = s_common.genpath(path, 'guid') if os.path.isfile(guidpath): os.unlink(guidpath) # if its a legacy cell remove that too cellpath = s_common.genpath(path, 'cell.guid') if os.path.isfile(cellpath): os.unlink(cellpath) cellslabpath = s_common.genpath(path, 'slabs', 'cell.lmdb') if os.path.isdir(cellslabpath): shutil.rmtree(cellslabpath, ignore_errors=True) # drop offsets slabpath = s_common.genpath(path, 'tank.lmdb') async with await s_lmdbslab.Slab.anit(slabpath, **conf) as slab: offs = s_slaboffs.SlabOffs(slab, 'offsets') slab.dropdb(offs.db) logger.warning('...migration complete') async def _migrateToV3(self): logger.warning('Beginning migration to V3') async with await self.hive.open(('cryo', 'names')) as hivenames: for name, node in hivenames: iden, conf = node.valu self.names.set(name, (iden, conf)) logger.warning('...migration complete')
[docs] @classmethod def getEnvPrefix(cls): return ('SYN_CRYOTANK', )
def _initCryoPerms(self): self._cryo_permdefs.extend(( {'perm': ('cryo', 'tank', 'add'), 'gate': 'cryo', 'desc': 'Controls access to creating a new tank.'}, {'perm': ('cryo', 'tank', 'put'), 'gate': 'tank', 'desc': 'Controls access to adding data to a specific tank.'}, {'perm': ('cryo', 'tank', 'read'), 'gate': 'tank', 'desc': 'Controls access to reading data from a specific tank.'}, )) for pdef in self._cryo_permdefs: s_storm.reqValidPermDef(pdef) def _getPermDefs(self): permdefs = list(s_cell.Cell._getPermDefs(self)) permdefs.extend(self._cryo_permdefs) permdefs.sort(key=lambda x: x['perm']) return tuple(permdefs)
[docs] async def getCellApi(self, link, user, path): if not path: return await self.cellapi.anit(self, link, user) if len(path) == 1: tank = await self.init(path[0], user=user) return await self.tankapi.anit(tank, link, user) raise s_exc.NoSuchPath(path=path)
[docs] async def init(self, name, conf=None, user=None): ''' Generate a new CryoTank with a given name or get a reference to an existing CryoTank. Args: name (str): Name of the CryoTank. user (User): The Telepath user. Returns: CryoTank: A CryoTank instance. ''' tank = self.tanks.get(name) if tank is not None: return tank if user is not None: user.confirm(('cryo', 'tank', 'add'), gateiden='cryo') iden = s_common.guid() logger.info('Creating new tank: [%s][%s]', name, iden) path = s_common.genpath(self.dirn, 'tanks', iden) tank = await CryoTank.anit(path, iden, conf) self.names.set(name, (iden, conf)) self.tanks.put(name, tank) await self.auth.addAuthGate(iden, 'tank') if user is not None: await user.setAdmin(True, gateiden=tank.iden()) return tank
[docs] async def list(self, user=None): ''' Get a list of (name, info) tuples for the CryoTanks. Returns: list: A list of tufos. user (User): The Telepath user. ''' infos = [] for name, tank in self.tanks.items(): if user is not None and not user.allowed(('cryo', 'tank', 'read'), gateiden=tank.iden()): continue infos.append((name, await tank.info())) return infos
[docs] async def delete(self, name): tank = self.tanks.pop(name) if tank is None: return False iden, _ = self.names.pop(name) await tank.fini() shutil.rmtree(tank.dirn, ignore_errors=True) await self.auth.delAuthGate(iden) return True