Source code for synapse.axon

import csv
import json
import asyncio
import hashlib
import logging
import tempfile
import contextlib

import aiohttp
import aiohttp_socks

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.cell as s_cell
import synapse.lib.coro as s_coro
import synapse.lib.base as s_base
import synapse.lib.link as s_link
import synapse.lib.const as s_const
import synapse.lib.nexus as s_nexus
import synapse.lib.share as s_share
import synapse.lib.config as s_config
import synapse.lib.hashset as s_hashset
import synapse.lib.httpapi as s_httpapi
import synapse.lib.urlhelp as s_urlhelp
import synapse.lib.msgpack as s_msgpack
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.lib.slabseqn as s_slabseqn

logger = logging.getLogger(__name__)

CHUNK_SIZE = 16 * s_const.mebibyte
MAX_SPOOL_SIZE = CHUNK_SIZE * 32  # 512 mebibytes
MAX_HTTP_UPLOAD_SIZE = 4 * s_const.tebibyte

[docs] class AxonHandlerMixin:
[docs] def getAxon(self): ''' Get a reference to the Axon interface used by the handler. ''' return self.cell
[docs] class AxonHttpUploadV1(AxonHandlerMixin, s_httpapi.StreamHandler):
[docs] async def prepare(self): self.upfd = None if not await self.allowed(('axon', 'upload')): await self.finish() # max_body_size defaults to 100MB and requires a value self.request.connection.set_max_body_size(MAX_HTTP_UPLOAD_SIZE) self.upfd = await self.getAxon().upload() self.hashset = s_hashset.HashSet()
[docs] async def data_received(self, chunk): if chunk is not None: await self.upfd.write(chunk) self.hashset.update(chunk) await asyncio.sleep(0)
[docs] def on_finish(self): if self.upfd is not None and not self.upfd.isfini: self.getAxon().schedCoroSafe(self.upfd.fini())
[docs] def on_connection_close(self): self.on_finish()
async def _save(self): size, sha256b = await self.upfd.save() fhashes = {htyp: hasher.hexdigest() for htyp, hasher in self.hashset.hashes} assert sha256b == s_common.uhex(fhashes.get('sha256')) assert size == self.hashset.size fhashes['size'] = size return self.sendRestRetn(fhashes)
[docs] async def post(self): ''' Called after all data has been read. ''' await self._save() return
[docs] async def put(self): await self._save() return
[docs] class AxonHttpHasV1(AxonHandlerMixin, s_httpapi.Handler):
[docs] async def get(self, sha256): if not await self.allowed(('axon', 'has')): return resp = await self.getAxon().has(s_common.uhex(sha256)) return self.sendRestRetn(resp)
reqValidAxonDel = s_config.getJsValidator({ 'type': 'object', 'properties': { 'sha256s': { 'type': 'array', 'items': {'type': 'string', 'pattern': '(?i)^[0-9a-f]{64}$'} }, }, 'additionalProperties': False, 'required': ['sha256s'], })
[docs] class AxonHttpDelV1(AxonHandlerMixin, s_httpapi.Handler):
[docs] async def post(self): if not await self.allowed(('axon', 'del')): return body = self.getJsonBody(validator=reqValidAxonDel) if body is None: return sha256s = body.get('sha256s') hashes = [s_common.uhex(s) for s in sha256s] resp = await self.getAxon().dels(hashes) return self.sendRestRetn(tuple(zip(sha256s, resp)))
[docs] class AxonFileHandler(AxonHandlerMixin, s_httpapi.Handler):
[docs] async def getAxonInfo(self): return await self.getAxon().getCellInfo()
async def _setSha256Headers(self, sha256b): self.ranges = [] self.blobsize = await self.getAxon().size(sha256b) if self.blobsize is None: self.set_status(404) self.sendRestErr('NoSuchFile', f'SHA-256 not found: {s_common.ehex(sha256b)}') return False status = 200 info = await self.getAxonInfo() if info.get('features', {}).get('byterange'): self.set_header('Accept-Ranges', 'bytes') self._chopRangeHeader() if len(self.ranges): status = 206 soff, eoff = self.ranges[0] # Negative lengths are invalid cont_len = eoff - soff if cont_len < 1: self.set_status(416) return False # Reading past the blobsize is invalid if soff >= self.blobsize: self.set_status(416) return False if soff + cont_len > self.blobsize: self.set_status(416) return False # ranges are *inclusive*... self.set_header('Content-Range', f'bytes {soff}-{eoff-1}/{self.blobsize}') self.set_header('Content-Length', str(cont_len)) # TODO eventually support multi-range returns else: self.set_header('Content-Length', str(self.blobsize)) self.set_status(status) return True def _chopRangeHeader(self): header = self.request.headers.get('range', '').strip().lower() if not header.startswith('bytes='): return for part in header.split('=', 1)[1].split(','): part = part.strip() if not part: continue soff, eoff = part.split('-', 1) soff = int(soff.strip()) eoff = eoff.strip() if not eoff: eoff = self.blobsize else: eoff = int(eoff) + 1 self.ranges.append((soff, eoff)) async def _sendSha256Byts(self, sha256b): # a single range is simple... if self.ranges: # TODO eventually support multi-range returns soff, eoff = self.ranges[0] size = eoff - soff async for byts in self.getAxon().get(sha256b, soff, size): self.write(byts) await self.flush() await asyncio.sleep(0) return # standard file return async for byts in self.getAxon().get(sha256b): self.write(byts) await self.flush() await asyncio.sleep(0)
[docs] class AxonHttpBySha256V1(AxonFileHandler):
[docs] async def head(self, sha256): if not await self.allowed(('axon', 'get')): return sha256b = s_common.uhex(sha256) if not await self._setSha256Headers(sha256b): return self.set_header('Content-Disposition', 'attachment') self.set_header('Content-Type', 'application/octet-stream') return self.finish()
[docs] async def get(self, sha256): if not await self.allowed(('axon', 'get')): return sha256b = s_common.uhex(sha256) if not await self._setSha256Headers(sha256b): return self.set_header('Content-Disposition', 'attachment') self.set_header('Content-Type', 'application/octet-stream') await self._sendSha256Byts(sha256b) return self.finish()
[docs] async def delete(self, sha256): if not await self.allowed(('axon', 'del')): return sha256b = s_common.uhex(sha256) if not await self.getAxon().has(sha256b): self.set_status(404) self.sendRestErr('NoSuchFile', f'SHA-256 not found: {sha256}') return resp = await self.getAxon().del_(sha256b) return self.sendRestRetn(resp)
[docs] class AxonHttpBySha256InvalidV1(AxonFileHandler): async def _handle_err(self, sha256, send_err=True): if not await self.reqAuthUser(): return self.set_status(404) if send_err: self.sendRestErr('BadArg', f'Hash is not a SHA-256: {sha256}')
[docs] async def delete(self, sha256): return await self._handle_err(sha256)
[docs] async def get(self, sha256): return await self._handle_err(sha256)
[docs] async def head(self, sha256): return await self._handle_err(sha256, send_err=False)
[docs] class UpLoad(s_base.Base): ''' An object used to manage uploads to the Axon. ''' async def __anit__(self, axon): # type: ignore await s_base.Base.__anit__(self) self.axon = axon dirn = s_common.gendir(axon.dirn, 'tmp') self.fd = tempfile.SpooledTemporaryFile(max_size=MAX_SPOOL_SIZE, dir=dirn) self.size = 0 self.sha256 = hashlib.sha256() self.onfini(self._uploadFini) def _uploadFini(self): self.fd.close() def _reset(self): if self.fd._rolled or self.fd.closed: self.fd.close() dirn = s_common.gendir(self.axon.dirn, 'tmp') self.fd = tempfile.SpooledTemporaryFile(max_size=MAX_SPOOL_SIZE, dir=dirn) else: # If we haven't rolled over, this skips allocating new objects self.fd.truncate(0) self.fd.seek(0) self.size = 0 self.sha256 = hashlib.sha256()
[docs] async def write(self, byts): ''' Write bytes to the Upload object. Args: byts (bytes): Bytes to write to the current Upload object. Returns: (None): Returns None. ''' self.size += len(byts) self.sha256.update(byts) self.fd.write(byts)
[docs] async def save(self): ''' Save the currently uploaded bytes to the Axon. Notes: This resets the Upload object, so it can be reused. Returns: tuple(int, bytes): A tuple of sizes in bytes and the sha256 hash of the saved files. ''' sha256 = self.sha256.digest() rsize = self.size if await self.axon.has(sha256): self._reset() return rsize, sha256 def genr(): self.fd.seek(0) while True: if self.isfini: raise s_exc.IsFini() byts = self.fd.read(CHUNK_SIZE) if not byts: return yield byts await self.axon.save(sha256, genr(), rsize) self._reset() return rsize, sha256
[docs] class UpLoadShare(UpLoad, s_share.Share): # type: ignore typename = 'upload' async def __anit__(self, axon, link): await UpLoad.__anit__(self, axon) await s_share.Share.__anit__(self, link, None)
[docs] class UpLoadProxy(s_share.Share): async def __anit__(self, link, upload): await s_share.Share.__anit__(self, link, upload) self.onfini(upload)
[docs] async def write(self, byts): return await self.item.write(byts)
[docs] async def save(self): return await self.item.save()
[docs] class AxonApi(s_cell.CellApi, s_share.Share): # type: ignore async def __anit__(self, cell, link, user): await s_cell.CellApi.__anit__(self, cell, link, user) await s_share.Share.__anit__(self, link, None)
[docs] async def get(self, sha256, offs=None, size=None): ''' Get bytes of a file. Args: sha256 (bytes): The sha256 hash of the file in bytes. offs (int): The offset to start reading from. size (int): The total number of bytes to read. Examples: Get the bytes from an Axon and process them:: buf = b'' async for bytz in axon.get(sha256): buf += bytz await dostuff(buf) Yields: bytes: Chunks of the file bytes. Raises: synapse.exc.NoSuchFile: If the file does not exist. ''' await self._reqUserAllowed(('axon', 'get')) async for byts in self.cell.get(sha256, offs=offs, size=size): yield byts
[docs] async def has(self, sha256): ''' Check if the Axon has a file. Args: sha256 (bytes): The sha256 hash of the file in bytes. Returns: boolean: True if the Axon has the file; false otherwise. ''' await self._reqUserAllowed(('axon', 'has')) return await self.cell.has(sha256)
[docs] async def size(self, sha256): ''' Get the size of a file in the Axon. Args: sha256 (bytes): The sha256 hash of the file in bytes. Returns: int: The size of the file, in bytes. If not present, None is returned. ''' await self._reqUserAllowed(('axon', 'has')) return await self.cell.size(sha256)
[docs] async def hashset(self, sha256): ''' Calculate additional hashes for a file in the Axon. Args: sha256 (bytes): The sha256 hash of the file in bytes. Returns: dict: A dictionary containing hashes of the file. ''' await self._reqUserAllowed(('axon', 'has')) return await self.cell.hashset(sha256)
[docs] async def hashes(self, offs, wait=False, timeout=None): ''' Yield hash rows for files that exist in the Axon in added order starting at an offset. Args: offs (int): The index offset. wait (boolean): Wait for new results and yield them in realtime. timeout (int): Max time to wait for new results. Yields: (int, (bytes, int)): An index offset and the file SHA-256 and size. ''' await self._reqUserAllowed(('axon', 'has')) async for item in self.cell.hashes(offs, wait=wait, timeout=timeout): yield item
[docs] async def history(self, tick, tock=None): ''' Yield hash rows for files that existing in the Axon after a given point in time. Args: tick (int): The starting time (in epoch milliseconds). tock (int): The ending time to stop iterating at (in epoch milliseconds). Yields: (int, (bytes, int)): A tuple containing time of the hash was added and the file SHA-256 and size. ''' await self._reqUserAllowed(('axon', 'has')) async for item in self.cell.history(tick, tock=tock): yield item
[docs] async def wants(self, sha256s): ''' Get a list of sha256 values the axon does not have from an input list. Args: sha256s (list): A list of sha256 values as bytes. Returns: list: A list of bytes containing the sha256 hashes the Axon does not have. ''' await self._reqUserAllowed(('axon', 'has')) return await self.cell.wants(sha256s)
[docs] async def put(self, byts): ''' Store bytes in the Axon. Args: byts (bytes): The bytes to store in the Axon. Notes: This API should not be used for files greater than 128 MiB in size. Returns: tuple(int, bytes): A tuple with the file size and sha256 hash of the bytes. ''' await self._reqUserAllowed(('axon', 'upload')) return await self.cell.put(byts)
[docs] async def puts(self, files): ''' Store a set of bytes in the Axon. Args: files (list): A list of bytes to store in the Axon. Notes: This API should not be used for storing more than 128 MiB of bytes at once. Returns: list(tuple(int, bytes)): A list containing tuples of file size and sha256 hash of the saved bytes. ''' await self._reqUserAllowed(('axon', 'upload')) return await self.cell.puts(files)
[docs] async def upload(self): ''' Get an Upload object. Notes: The UpLoad object should be used to manage uploads greater than 128 MiB in size. Examples: Use an UpLoad object to upload a file to the Axon:: async with axonProxy.upload() as upfd: # Assumes bytesGenerator yields bytes async for byts in bytsgenerator(): upfd.write(byts) upfd.save() Use a single UpLoad object to save multiple files:: async with axonProxy.upload() as upfd: for fp in file_paths: # Assumes bytesGenerator yields bytes async for byts in bytsgenerator(fp): upfd.write(byts) upfd.save() Returns: UpLoadShare: An Upload manager object. ''' await self._reqUserAllowed(('axon', 'upload')) return await UpLoadShare.anit(self.cell, self.link)
[docs] async def del_(self, sha256): ''' Remove the given bytes from the Axon by sha256. Args: sha256 (bytes): The sha256, in bytes, to remove from the Axon. Returns: boolean: True if the file is removed; false if the file is not present. ''' await self._reqUserAllowed(('axon', 'del')) return await self.cell.del_(sha256)
[docs] async def dels(self, sha256s): ''' Given a list of sha256 hashes, delete the files from the Axon. Args: sha256s (list): A list of sha256 hashes in bytes form. Returns: list: A list of booleans, indicating if the file was deleted or not. ''' await self._reqUserAllowed(('axon', 'del')) return await self.cell.dels(sha256s)
[docs] async def wget(self, url, params=None, headers=None, json=None, body=None, method='GET', ssl=True, timeout=None, proxy=None, ssl_opts=None): ''' Stream a file download directly into the Axon. Args: url (str): The URL to retrieve. params (dict): Additional parameters to add to the URL. headers (dict): Additional HTTP headers to add in the request. json: A JSON body which is included with the request. body: The body to be included in the request. method (str): The HTTP method to use. ssl (bool): Perform SSL verification. timeout (int): The timeout of the request, in seconds. ssl_opts (dict): Additional SSL/TLS options. Notes: The response body will be stored, regardless of the response code. The ``ok`` value in the response does not reflect that a status code, such as a 404, was encountered when retrieving the URL. The ssl_opts dictionary may contain the following values:: { 'verify': <bool> - Perform SSL/TLS verification. Is overridden by the ssl argument. 'client_cert': <str> - PEM encoded full chain certificate for use in mTLS. 'client_key': <str> - PEM encoded key for use in mTLS. Alternatively, can be included in client_cert. } The dictionary returned by this may contain the following values:: { 'ok': <boolean> - False if there were exceptions retrieving the URL. 'url': <str> - The URL retrieved (which could have been redirected). This is a url-decoded string. 'code': <int> - The response code. 'reason': <str> - The reason phrase for the HTTP status code. 'mesg': <str> - An error message if there was an exception when retrieving the URL. 'err': <tuple> - An error tuple if there was an exception when retrieving the URL. 'headers': <dict> - The response headers as a dictionary. 'size': <int> - The size in bytes of the response body. 'hashes': { 'md5': <str> - The MD5 hash of the response body. 'sha1': <str> - The SHA1 hash of the response body. 'sha256': <str> - The SHA256 hash of the response body. 'sha512': <str> - The SHA512 hash of the response body. }, 'request': { 'url': The request URL. This is a url-decoded string. 'headers': The request headers. 'method': The request method. } 'history': A sequence of response bodies to track any redirects, not including hashes. } Returns: dict: An information dictionary containing the results of the request. ''' await self._reqUserAllowed(('axon', 'wget')) return await self.cell.wget(url, params=params, headers=headers, json=json, body=body, method=method, ssl=ssl, timeout=timeout, proxy=proxy, ssl_opts=ssl_opts)
[docs] async def postfiles(self, fields, url, params=None, headers=None, method='POST', ssl=True, timeout=None, proxy=None, ssl_opts=None): await self._reqUserAllowed(('axon', 'wput')) return await self.cell.postfiles(fields, url, params=params, headers=headers, method=method, ssl=ssl, timeout=timeout, proxy=proxy, ssl_opts=ssl_opts)
[docs] async def wput(self, sha256, url, params=None, headers=None, method='PUT', ssl=True, timeout=None, proxy=None, ssl_opts=None): await self._reqUserAllowed(('axon', 'wput')) return await self.cell.wput(sha256, url, params=params, headers=headers, method=method, ssl=ssl, timeout=timeout, proxy=proxy, ssl_opts=ssl_opts)
[docs] async def metrics(self): ''' Get the runtime metrics of the Axon. Returns: dict: A dictionary of runtime data about the Axon. ''' await self._reqUserAllowed(('axon', 'has')) return await self.cell.metrics()
[docs] async def iterMpkFile(self, sha256): ''' Yield items from a MsgPack (.mpk) file in the Axon. Args: sha256 (bytes): The sha256 hash of the file in bytes. Yields: Unpacked items from the bytes. ''' await self._reqUserAllowed(('axon', 'get')) async for item in self.cell.iterMpkFile(sha256): yield item
[docs] async def readlines(self, sha256, errors='ignore'): ''' Yield lines from a multi-line text file in the axon. Args: sha256 (bytes): The sha256 hash of the file. errors (str): Specify how encoding errors should handled. Yields: str: Lines of text ''' await self._reqUserAllowed(('axon', 'get')) async for item in self.cell.readlines(sha256, errors=errors): yield item
[docs] async def csvrows(self, sha256, dialect='excel', errors='ignore', **fmtparams): ''' Yield CSV rows from a CSV file. Args: sha256 (bytes): The sha256 hash of the file. dialect (str): The CSV dialect to use. errors (str): Specify how encoding errors should handled. **fmtparams: The CSV dialect format parameters. Notes: The dialect and fmtparams expose the Python csv.reader() parameters. Examples: Get the rows from a CSV file and process them:: async for row in axon.csvrows(sha256): await dostuff(row) Get the rows from a tab separated file and process them:: async for row in axon.csvrows(sha256, delimiter='\t'): await dostuff(row) Yields: list: Decoded CSV rows. ''' await self._reqUserAllowed(('axon', 'get')) async for item in self.cell.csvrows(sha256, dialect, errors=errors, **fmtparams): yield item
[docs] async def jsonlines(self, sha256, errors='ignore'): ''' Yield JSON objects from JSONL (JSON lines) file. Args: sha256 (bytes): The sha256 hash of the file. errors (str): Specify how encoding errors should handled. Yields: object: Decoded JSON objects. ''' await self._reqUserAllowed(('axon', 'get')) async for item in self.cell.jsonlines(sha256, errors=errors): yield item
[docs] class Axon(s_cell.Cell): cellapi = AxonApi byterange = False confdefs = { 'max:bytes': { 'description': 'The maximum number of bytes that can be stored in the Axon.', 'type': 'integer', 'minimum': 1, 'hidecmdl': True, }, 'max:count': { 'description': 'The maximum number of files that can be stored in the Axon.', 'type': 'integer', 'minimum': 1, 'hidecmdl': True, }, 'http:proxy': { 'description': 'An aiohttp-socks compatible proxy URL to use in the wget API.', 'type': 'string', }, 'tls:ca:dir': { 'description': 'An optional directory of CAs which are added to the TLS CA chain for wget and wput APIs.', 'type': 'string', }, }
[docs] async def initServiceStorage(self): # type: ignore path = s_common.gendir(self.dirn, 'axon.lmdb') self.axonslab = await s_lmdbslab.Slab.anit(path) self.sizes = self.axonslab.initdb('sizes') self.onfini(self.axonslab.fini) self.hashlocks = {} self.axonhist = s_lmdbslab.Hist(self.axonslab, 'history') self.axonseqn = s_slabseqn.SlabSeqn(self.axonslab, 'axonseqn') self.axonmetrics = await self.axonslab.getHotCount('metrics') if self.inaugural: self.axonmetrics.set('size:bytes', 0) self.axonmetrics.set('file:count', 0) await self._bumpCellVers('axon:metrics', ( (1, self._migrateAxonMetrics), ), nexs=False) self.maxbytes = self.conf.get('max:bytes') self.maxcount = self.conf.get('max:count') # modularize blob storage await self._initBlobStor()
[docs] async def initServiceRuntime(self): # share ourself via the cell dmon as "axon" # for potential default remote use self.dmon.share('axon', self) self._initAxonHttpApi() self.addHealthFunc(self._axonHealth)
[docs] async def getCellInfo(self): info = await s_cell.Cell.getCellInfo(self) info['features']['byterange'] = self.byterange return info
[docs] @contextlib.asynccontextmanager async def holdHashLock(self, hashbyts): ''' A context manager that synchronizes edit access to a blob. Args: hashbyts (bytes): The blob to hold the lock for. ''' item = self.hashlocks.get(hashbyts) if item is None: self.hashlocks[hashbyts] = item = [0, asyncio.Lock()] item[0] += 1 try: async with item[1]: yield finally: item[0] -= 1 if item[0] == 0: self.hashlocks.pop(hashbyts, None)
def _reqBelowLimit(self): if (self.maxbytes is not None and self.maxbytes <= self.axonmetrics.get('size:bytes')): mesg = f'Axon is at size:bytes limit: {self.maxbytes}' raise s_exc.HitLimit(mesg=mesg) if (self.maxcount is not None and self.maxcount <= self.axonmetrics.get('file:count')): mesg = f'Axon is at file:count limit: {self.maxcount}' raise s_exc.HitLimit(mesg=mesg) async def _axonHealth(self, health): health.update('axon', 'nominal', '', data=await self.metrics()) async def _migrateAxonMetrics(self): logger.warning('migrating Axon metrics data out of hive') async with await self.hive.open(('axon', 'metrics')) as hivenode: axonmetrics = await hivenode.dict() self.axonmetrics.set('size:bytes', axonmetrics.get('size:bytes', 0)) self.axonmetrics.set('file:count', axonmetrics.get('file:count', 0)) logger.warning('...Axon metrics migration complete!') async def _initBlobStor(self): self.byterange = True path = s_common.gendir(self.dirn, 'blob.lmdb') self.blobslab = await s_lmdbslab.Slab.anit(path) self.blobs = self.blobslab.initdb('blobs') self.offsets = self.blobslab.initdb('offsets') self.metadata = self.blobslab.initdb('metadata') self.onfini(self.blobslab.fini) if self.inaugural: self._setStorVers(1) storvers = self._getStorVers() if storvers < 1: storvers = await self._setStorVers01() async def _setStorVers01(self): logger.warning('Updating Axon storage version (adding offset index). This may take a while.') offs = 0 cursha = b'' # TODO: need LMDB to support getting value size without getting value for lkey, byts in self.blobslab.scanByFull(db=self.blobs): await asyncio.sleep(0) blobsha = lkey[:32] if blobsha != cursha: offs = 0 cursha = blobsha offs += len(byts) self.blobslab.put(cursha + offs.to_bytes(8, 'big'), lkey[32:], db=self.offsets) return self._setStorVers(1) def _getStorVers(self): byts = self.blobslab.get(b'version', db=self.metadata) if not byts: return 0 return int.from_bytes(byts, 'big') def _setStorVers(self, version): self.blobslab.put(b'version', version.to_bytes(8, 'big'), db=self.metadata) return version def _initAxonHttpApi(self): self.addHttpApi('/api/v1/axon/files/del', AxonHttpDelV1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/put', AxonHttpUploadV1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/has/sha256/([0-9a-fA-F]{64}$)', AxonHttpHasV1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/by/sha256/([0-9a-fA-F]{64}$)', AxonHttpBySha256V1, {'cell': self}) self.addHttpApi('/api/v1/axon/files/by/sha256/(.*)', AxonHttpBySha256InvalidV1, {'cell': self}) def _addSyncItem(self, item, tick=None): self.axonhist.add(item, tick=tick) self.axonseqn.add(item) async def _reqHas(self, sha256): ''' Ensure a file exists; and return its size if so. Args: sha256 (bytes): The sha256 to check. Returns: int: Size of the file in bytes. Raises: NoSuchFile: If the file does not exist. ''' fsize = await self.size(sha256) if fsize is None: raise s_exc.NoSuchFile(mesg='Axon does not contain the requested file.', sha256=s_common.ehex(sha256)) return fsize
[docs] async def history(self, tick, tock=None): ''' Yield hash rows for files that existing in the Axon after a given point in time. Args: tick (int): The starting time (in epoch milliseconds). tock (int): The ending time to stop iterating at (in epoch milliseconds). Yields: (int, (bytes, int)): A tuple containing time of the hash was added and the file SHA-256 and size. ''' for item in self.axonhist.carve(tick, tock=tock): yield item
[docs] async def hashes(self, offs, wait=False, timeout=None): ''' Yield hash rows for files that exist in the Axon in added order starting at an offset. Args: offs (int): The index offset. wait (boolean): Wait for new results and yield them in realtime. timeout (int): Max time to wait for new results. Yields: (int, (bytes, int)): An index offset and the file SHA-256 and size. Note: If the same hash was deleted and then added back, the same hash will be yielded twice. ''' async for item in self.axonseqn.aiter(offs, wait=wait, timeout=timeout): if self.axonslab.has(item[1][0], db=self.sizes): yield item await asyncio.sleep(0)
[docs] async def get(self, sha256, offs=None, size=None): ''' Get bytes of a file. Args: sha256 (bytes): The sha256 hash of the file in bytes. offs (int): The offset to start reading from. size (int): The total number of bytes to read. Examples: Get the bytes from an Axon and process them:: buf = b'' async for bytz in axon.get(sha256): buf =+ bytz await dostuff(buf) Yields: bytes: Chunks of the file bytes. Raises: synapse.exc.NoSuchFile: If the file does not exist. ''' fsize = await self._reqHas(sha256) fhash = s_common.ehex(sha256) logger.debug(f'Getting blob [{fhash}].', extra=await self.getLogExtra(sha256=fhash)) if offs is not None or size is not None: if not self.byterange: # pragma: no cover mesg = 'This axon does not support byte ranges.' raise s_exc.FeatureNotSupported(mesg=mesg) if offs < 0: raise s_exc.BadArg(mesg='Offs must be >= 0', offs=offs) if size < 1: raise s_exc.BadArg(mesg='Size must be >= 1', size=size) if offs >= fsize: # If we try to read past the file, yield empty bytes and return. yield b'' return async for byts in self._getBytsOffsSize(sha256, offs, size): yield byts else: async for byts in self._get(sha256): yield byts
async def _get(self, sha256): for _, byts in self.blobslab.scanByPref(sha256, db=self.blobs): yield byts
[docs] async def put(self, byts): ''' Store bytes in the Axon. Args: byts (bytes): The bytes to store in the Axon. Notes: This API should not be used for files greater than 128 MiB in size. Returns: tuple(int, bytes): A tuple with the file size and sha256 hash of the bytes. ''' # Use a UpLoad context manager so that we can # ensure that a one-shot set of bytes is chunked # in a consistent fashion. async with await self.upload() as fd: await fd.write(byts) return await fd.save()
[docs] async def puts(self, files): ''' Store a set of bytes in the Axon. Args: files (list): A list of bytes to store in the Axon. Notes: This API should not be used for storing more than 128 MiB of bytes at once. Returns: list(tuple(int, bytes)): A list containing tuples of file size and sha256 hash of the saved bytes. ''' return [await self.put(b) for b in files]
[docs] async def upload(self): ''' Get an Upload object. Notes: The UpLoad object should be used to manage uploads greater than 128 MiB in size. Examples: Use an UpLoad object to upload a file to the Axon:: async with await axon.upload() as upfd: # Assumes bytesGenerator yields bytes async for byts in bytsgenerator(): await upfd.write(byts) await upfd.save() Use a single UpLoad object to save multiple files:: async with await axon.upload() as upfd: for fp in file_paths: # Assumes bytesGenerator yields bytes async for byts in bytsgenerator(fp): await upfd.write(byts) await upfd.save() Returns: UpLoad: An Upload manager object. ''' return await UpLoad.anit(self)
[docs] async def has(self, sha256): ''' Check if the Axon has a file. Args: sha256 (bytes): The sha256 hash of the file in bytes. Returns: boolean: True if the Axon has the file; false otherwise. ''' return self.axonslab.get(sha256, db=self.sizes) is not None
[docs] async def size(self, sha256): ''' Get the size of a file in the Axon. Args: sha256 (bytes): The sha256 hash of the file in bytes. Returns: int: The size of the file, in bytes. If not present, None is returned. ''' byts = self.axonslab.get(sha256, db=self.sizes) if byts is not None: return int.from_bytes(byts, 'big')
[docs] async def hashset(self, sha256): ''' Calculate additional hashes for a file in the Axon. Args: sha256 (bytes): The sha256 hash of the file in bytes. Returns: dict: A dictionary containing hashes of the file. ''' await self._reqHas(sha256) fhash = s_common.ehex(sha256) logger.debug(f'Getting blob [{fhash}].', extra=await self.getLogExtra(sha256=fhash)) hashset = s_hashset.HashSet() async for byts in self._get(sha256): hashset.update(byts) await asyncio.sleep(0) return dict([(n, s_common.ehex(h)) for (n, h) in hashset.digests()])
[docs] async def metrics(self): ''' Get the runtime metrics of the Axon. Returns: dict: A dictionary of runtime data about the Axon. ''' return self.axonmetrics.pack()
[docs] async def save(self, sha256, genr, size): ''' Save a generator of bytes to the Axon. Args: sha256 (bytes): The sha256 hash of the file in bytes. genr: The bytes generator function. Returns: int: The size of the bytes saved. ''' assert genr is not None and isinstance(size, int) return await self._populate(sha256, genr, size)
async def _populate(self, sha256, genr, size): ''' Populates the metadata and save the data itself if genr is not None ''' assert genr is not None and isinstance(size, int) self._reqBelowLimit() async with self.holdHashLock(sha256): byts = self.axonslab.get(sha256, db=self.sizes) if byts is not None: return int.from_bytes(byts, 'big') fhash = s_common.ehex(sha256) logger.debug(f'Saving blob [{fhash}].', extra=await self.getLogExtra(sha256=fhash)) size = await self._saveFileGenr(sha256, genr, size) await self._axonFileAdd(sha256, size, {'tick': s_common.now()}) return size @s_nexus.Pusher.onPushAuto('axon:file:add') async def _axonFileAdd(self, sha256, size, info): byts = self.axonslab.get(sha256, db=self.sizes) if byts is not None: return False tick = info.get('tick') self._addSyncItem((sha256, size), tick=tick) self.axonmetrics.inc('file:count') self.axonmetrics.inc('size:bytes', valu=size) self.axonslab.put(sha256, size.to_bytes(8, 'big'), db=self.sizes) return True async def _saveFileGenr(self, sha256, genr, size): size = 0 for i, byts in enumerate(genr): size += len(byts) await self._axonBytsSave(sha256, i, size, byts) await asyncio.sleep(0) return size # a nexusified way to save local bytes @s_nexus.Pusher.onPushAuto('axon:bytes:add') async def _axonBytsSave(self, sha256, indx, offs, byts): ikey = indx.to_bytes(8, 'big') okey = offs.to_bytes(8, 'big') self.blobslab.put(sha256 + ikey, byts, db=self.blobs) self.blobslab.put(sha256 + okey, ikey, db=self.offsets) def _offsToIndx(self, sha256, offs): lkey = sha256 + offs.to_bytes(8, 'big') for offskey, indxbyts in self.blobslab.scanByRange(lkey, db=self.offsets): return int.from_bytes(offskey[32:], 'big'), indxbyts async def _getBytsOffs(self, sha256, offs): first = True boff, indxbyts = self._offsToIndx(sha256, offs) for bkey, byts in self.blobslab.scanByRange(sha256 + indxbyts, db=self.blobs): await asyncio.sleep(0) if bkey[:32] != sha256: return if first: first = False delt = boff - offs yield byts[-delt:] continue yield byts async def _getBytsOffsSize(self, sha256, offs, size): ''' Implementation dependent method to stream size # of bytes from the Axon, starting a given offset. ''' # This implementation assumes that the offs provided is < the maximum # size of the sha256 value being asked for. remain = size async for byts in self._getBytsOffs(sha256, offs): blen = len(byts) if blen >= remain: yield byts[:remain] return remain -= blen yield byts
[docs] async def dels(self, sha256s): ''' Given a list of sha256 hashes, delete the files from the Axon. Args: sha256s (list): A list of sha256 hashes in bytes form. Returns: list: A list of booleans, indicating if the file was deleted or not. ''' return [await self.del_(s) for s in sha256s]
[docs] async def del_(self, sha256): ''' Remove the given bytes from the Axon by sha256. Args: sha256 (bytes): The sha256, in bytes, to remove from the Axon. Returns: boolean: True if the file is removed; false if the file is not present. ''' if not await self.has(sha256): return False return await self._axonFileDel(sha256)
@s_nexus.Pusher.onPushAuto('axon:file:del') async def _axonFileDel(self, sha256): async with self.holdHashLock(sha256): byts = self.axonslab.pop(sha256, db=self.sizes) if not byts: return False fhash = s_common.ehex(sha256) logger.debug(f'Deleting blob [{fhash}].', extra=await self.getLogExtra(sha256=fhash)) size = int.from_bytes(byts, 'big') self.axonmetrics.inc('file:count', valu=-1) self.axonmetrics.inc('size:bytes', valu=-size) await self._delBlobByts(sha256) return True async def _delBlobByts(self, sha256): # remove the offset indexes... for lkey in self.blobslab.scanKeysByPref(sha256, db=self.blobs): self.blobslab.delete(lkey, db=self.offsets) await asyncio.sleep(0) # remove the actual blobs... for lkey in self.blobslab.scanKeysByPref(sha256, db=self.blobs): self.blobslab.delete(lkey, db=self.blobs) await asyncio.sleep(0)
[docs] async def wants(self, sha256s): ''' Get a list of sha256 values the axon does not have from a input list. Args: sha256s (list): A list of sha256 values as bytes. Returns: list: A list of bytes containing the sha256 hashes the Axon does not have. ''' return [s for s in sha256s if not await self.has(s)]
[docs] async def iterMpkFile(self, sha256): ''' Yield items from a MsgPack (.mpk) file in the Axon. Args: sha256 (str): The sha256 hash of the file as a string. Yields: Unpacked items from the bytes. ''' unpk = s_msgpack.Unpk() async for byts in self.get(s_common.uhex(sha256)): for _, item in unpk.feed(byts): yield item
async def _sha256ToLink(self, sha256, link): try: async for byts in self.get(sha256): await link.send(byts) await asyncio.sleep(0) finally: link.txfini()
[docs] async def readlines(self, sha256, errors='ignore'): sha256 = s_common.uhex(sha256) await self._reqHas(sha256) link00, sock00 = await s_link.linksock(forceclose=True) feedtask = None try: todo = s_common.todo(_spawn_readlines, sock00, errors=errors) async with await s_base.Base.anit() as scope: scope.schedCoro(s_coro.spawn(todo, log_conf=await self._getSpawnLogConf())) feedtask = scope.schedCoro(self._sha256ToLink(sha256, link00)) while not self.isfini: mesg = await link00.rx() if mesg is None: return line = s_common.result(mesg) if line is None: return yield line.rstrip('\n') finally: sock00.close() await link00.fini() if feedtask is not None: await feedtask
[docs] async def csvrows(self, sha256, dialect='excel', errors='ignore', **fmtparams): await self._reqHas(sha256) if dialect not in csv.list_dialects(): raise s_exc.BadArg(mesg=f'Invalid CSV dialect, use one of {csv.list_dialects()}') link00, sock00 = await s_link.linksock(forceclose=True) feedtask = None try: todo = s_common.todo(_spawn_readrows, sock00, dialect, fmtparams, errors=errors) async with await s_base.Base.anit() as scope: scope.schedCoro(s_coro.spawn(todo, log_conf=await self._getSpawnLogConf())) feedtask = scope.schedCoro(self._sha256ToLink(sha256, link00)) while not self.isfini: mesg = await link00.rx() if mesg is None: return row = s_common.result(mesg) if row is None: return yield row finally: sock00.close() await link00.fini() if feedtask is not None: await feedtask
[docs] async def jsonlines(self, sha256, errors='ignore'): async for line in self.readlines(sha256, errors=errors): line = line.strip() if not line: continue try: yield json.loads(line) except json.JSONDecodeError as e: logger.exception(f'Bad json line encountered for {sha256}') raise s_exc.BadJsonText(mesg=f'Bad json line encountered while processing {sha256}, ({e})', sha256=sha256) from None
[docs] async def postfiles(self, fields, url, params=None, headers=None, method='POST', ssl=True, timeout=None, proxy=None, ssl_opts=None): ''' Send files from the axon as fields in a multipart/form-data HTTP request. Args: fields (list): List of dicts containing the fields to add to the request as form-data. url (str): The URL to retrieve. params (dict): Additional parameters to add to the URL. headers (dict): Additional HTTP headers to add in the request. method (str): The HTTP method to use. ssl (bool): Perform SSL verification. timeout (int): The timeout of the request, in seconds. proxy (bool|str|null): Use a specific proxy or disable proxy use. ssl_opts (dict): Additional SSL/TLS options. Notes: The dictionaries in the fields list may contain the following values:: { 'name': <str> - Name of the field. 'sha256': <str> - SHA256 hash of the file to submit for this field. 'value': <str> - Value for the field. Ignored if a sha256 has been specified. 'filename': <str> - Optional filename for the field. 'content_type': <str> - Optional content type for the field. 'content_transfer_encoding': <str> - Optional content-transfer-encoding header for the field. } The ssl_opts dictionary may contain the following values:: { 'verify': <bool> - Perform SSL/TLS verification. Is overridden by the ssl argument. 'client_cert': <str> - PEM encoded full chain certificate for use in mTLS. 'client_key': <str> - PEM encoded key for use in mTLS. Alternatively, can be included in client_cert. } The dictionary returned by this may contain the following values:: { 'ok': <boolean> - False if there were exceptions retrieving the URL. 'err': <tuple> - Tuple of the error type and information if an exception occurred. 'url': <str> - The URL retrieved (which could have been redirected) 'code': <int> - The response code. 'body': <bytes> - The response body. 'reason': <str> - The reason phrase for the HTTP status code. 'headers': <dict> - The response headers as a dictionary. } Returns: dict: An information dictionary containing the results of the request. ''' if proxy is None: proxy = self.conf.get('http:proxy') ssl = self.getCachedSslCtx(opts=ssl_opts, verify=ssl) connector = None if proxy: connector = aiohttp_socks.ProxyConnector.from_url(proxy) atimeout = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession(connector=connector, timeout=atimeout) as sess: try: data = aiohttp.FormData() data._is_multipart = True for field in fields: name = field.get('name') if not isinstance(name, str): mesg = f'Each field requires a "name" key with a string value: {name}' raise s_exc.BadArg(mesg=mesg, name=name) sha256 = field.get('sha256') if sha256: sha256b = s_common.uhex(sha256) await self._reqHas(sha256b) valu = self.get(sha256b) else: valu = field.get('value') if not isinstance(valu, (bytes, str)): valu = json.dumps(valu) data.add_field(name, valu, content_type=field.get('content_type'), filename=field.get('filename'), content_transfer_encoding=field.get('content_transfer_encoding')) async with sess.request(method, url, headers=headers, params=params, data=data, ssl=ssl) as resp: info = { 'ok': True, 'url': str(resp.url), 'code': resp.status, 'body': await resp.read(), 'reason': s_common.httpcodereason(resp.status), 'headers': dict(resp.headers), } return info except Exception as e: logger.exception(f'Error POSTing files to [{s_urlhelp.sanitizeUrl(url)}]') err = s_common.err(e) errmsg = err[1].get('mesg') if errmsg: reason = f'Exception occurred during request: {err[0]}: {errmsg}' else: reason = f'Exception occurred during request: {err[0]}' return { 'ok': False, 'err': err, 'url': url, 'body': b'', 'code': -1, 'reason': reason, 'headers': dict(), }
[docs] async def wput(self, sha256, url, params=None, headers=None, method='PUT', ssl=True, timeout=None, filename=None, filemime=None, proxy=None, ssl_opts=None): ''' Stream a blob from the axon as the body of an HTTP request. ''' if proxy is None: proxy = self.conf.get('http:proxy') ssl = self.getCachedSslCtx(opts=ssl_opts, verify=ssl) connector = None if proxy: connector = aiohttp_socks.ProxyConnector.from_url(proxy) atimeout = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession(connector=connector, timeout=atimeout) as sess: try: await self._reqHas(sha256) async with sess.request(method, url, headers=headers, params=params, data=self.get(sha256), ssl=ssl) as resp: info = { 'ok': True, 'url': str(resp.url), 'code': resp.status, 'reason': s_common.httpcodereason(resp.status), 'headers': dict(resp.headers), } return info except Exception as e: logger.exception(f'Error streaming [{sha256}] to [{s_urlhelp.sanitizeUrl(url)}]') err = s_common.err(e) errmsg = err[1].get('mesg') if errmsg: mesg = f"{err[0]}: {errmsg}" reason = f'Exception occurred during request: {err[0]}: {errmsg}' else: mesg = err[0] reason = f'Exception occurred during request: {err[0]}' return { 'ok': False, 'mesg': mesg, 'code': -1, 'reason': reason, 'err': err, }
def _flatten_clientresponse(self, resp: aiohttp.ClientResponse, ) -> dict: info = { 'ok': True, 'url': str(resp.real_url), 'code': resp.status, 'headers': dict(resp.headers), 'reason': s_common.httpcodereason(resp.status), 'request': { 'url': str(resp.request_info.real_url), 'headers': dict(resp.request_info.headers), 'method': str(resp.request_info.method), } } if resp.history: info['history'] = [self._flatten_clientresponse(hist) for hist in resp.history] return info
[docs] async def wget(self, url, params=None, headers=None, json=None, body=None, method='GET', ssl=True, timeout=None, proxy=None, ssl_opts=None): ''' Stream a file download directly into the Axon. Args: url (str): The URL to retrieve. params (dict): Additional parameters to add to the URL. headers (dict): Additional HTTP headers to add in the request. json: A JSON body which is included with the request. body: The body to be included in the request. method (str): The HTTP method to use. ssl (bool): Perform SSL verification. timeout (int): The timeout of the request, in seconds. proxy (bool|str|null): Use a specific proxy or disable proxy use. ssl_opts (dict): Additional SSL/TLS options. Notes: The response body will be stored, regardless of the response code. The ``ok`` value in the response does not reflect that a status code, such as a 404, was encountered when retrieving the URL. The ssl_opts dictionary may contain the following values:: { 'verify': <bool> - Perform SSL/TLS verification. Is overridden by the ssl argument. 'client_cert': <str> - PEM encoded full chain certificate for use in mTLS. 'client_key': <str> - PEM encoded key for use in mTLS. Alternatively, can be included in client_cert. } The dictionary returned by this may contain the following values:: { 'ok': <boolean> - False if there were exceptions retrieving the URL. 'url': <str> - The URL retrieved (which could have been redirected). This is a url-decoded string. 'code': <int> - The response code. 'reason': <str> - The reason phrase for the HTTP status code. 'mesg': <str> - An error message if there was an exception when retrieving the URL. 'err': <tuple> - An error tuple if there was an exception when retrieving the URL. 'headers': <dict> - The response headers as a dictionary. 'size': <int> - The size in bytes of the response body. 'hashes': { 'md5': <str> - The MD5 hash of the response body. 'sha1': <str> - The SHA1 hash of the response body. 'sha256': <str> - The SHA256 hash of the response body. 'sha512': <str> - The SHA512 hash of the response body. }, 'request': { 'url': The request URL. This is a url-decoded string. 'headers': The request headers. 'method': The request method. } 'history': A sequence of response bodies to track any redirects, not including hashes. } Returns: dict: An information dictionary containing the results of the request. ''' logger.debug(f'Wget called for [{url}].', extra=await self.getLogExtra(url=s_urlhelp.sanitizeUrl(url))) if proxy is None: proxy = self.conf.get('http:proxy') ssl = self.getCachedSslCtx(opts=ssl_opts, verify=ssl) connector = None if proxy: connector = aiohttp_socks.ProxyConnector.from_url(proxy) atimeout = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession(connector=connector, timeout=atimeout) as sess: try: async with sess.request(method, url, headers=headers, params=params, json=json, data=body, ssl=ssl) as resp: info = self._flatten_clientresponse(resp) hashset = s_hashset.HashSet() async with await self.upload() as upload: async for byts in resp.content.iter_chunked(CHUNK_SIZE): await upload.write(byts) hashset.update(byts) size, _ = await upload.save() info['size'] = size info['hashes'] = dict([(n, s_common.ehex(h)) for (n, h) in hashset.digests()]) return info except asyncio.CancelledError: raise except Exception as e: logger.exception(f'Failed to wget {s_urlhelp.sanitizeUrl(url)}') err = s_common.err(e) errmsg = err[1].get('mesg') if errmsg: mesg = f"{err[0]}: {errmsg}" reason = f'Exception occurred during request: {err[0]}: {errmsg}' else: mesg = err[0] reason = f'Exception occurred during request: {err[0]}' return { 'ok': False, 'mesg': mesg, 'code': -1, 'reason': reason, 'err': err, }
def _spawn_readlines(sock, errors='ignore'): # pragma: no cover try: with sock.makefile('r', errors=errors) as fd: try: for line in fd: sock.sendall(s_msgpack.en((True, line))) sock.sendall(s_msgpack.en((True, None))) except UnicodeDecodeError as e: raise s_exc.BadDataValu(mesg=str(e)) except Exception as e: mesg = s_common.retnexc(e) sock.sendall(s_msgpack.en(mesg)) def _spawn_readrows(sock, dialect, fmtparams, errors='ignore'): # pragma: no cover try: # Assume utf8 encoding and ignore errors. with sock.makefile('r', errors=errors) as fd: try: for row in csv.reader(fd, dialect, **fmtparams): sock.sendall(s_msgpack.en((True, row))) sock.sendall(s_msgpack.en((True, None))) except TypeError as e: raise s_exc.BadArg(mesg=f'Invalid csv format parameter: {str(e)}') except UnicodeDecodeError as e: raise s_exc.BadDataValu(mesg=str(e)) except csv.Error as e: mesg = f'CSV error: {str(e)}' raise s_exc.BadDataValu(mesg=mesg) except Exception as e: mesg = s_common.retnexc(e) sock.sendall(s_msgpack.en(mesg))