Source code for synapse.tools.feed

import os
import sys
import time
import asyncio
import logging
import argparse

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.cortex as s_cortex
import synapse.telepath as s_telepath

import synapse.lib.cmdr as s_cmdr
import synapse.lib.output as s_output
import synapse.lib.msgpack as s_msgpack
import synapse.lib.version as s_version
import synapse.lib.encoding as s_encoding

logger = logging.getLogger(__name__)

reqver = '>=0.2.0,<3.0.0'

[docs]def getItems(*paths): items = [] for path in paths: if path.endswith('.json'): item = s_common.jsload(path) if not isinstance(item, list): item = [item] items.append((path, item)) elif path.endswith('.jsonl'): with s_common.genfile(path) as fd: item = list(s_encoding.iterdata(fd, False, format='jsonl')) items.append((path, item)) elif path.endswith(('.yaml', '.yml')): item = s_common.yamlload(path) if not isinstance(item, list): item = [item] items.append((path, item)) elif path.endswith('.mpk') or path.endswith('.nodes'): genr = s_msgpack.iterfile(path) items.append((path, genr)) else: # pragma: no cover logger.warning('Unsupported file path: [%s]', path) return items
[docs]async def addFeedData(core, outp, feedformat, debug=False, *paths, chunksize=1000, offset=0, viewiden=None): items = getItems(*paths) for path, item in items: bname = os.path.basename(path) tick = time.time() outp.printf(f'Adding items from [{path}]') foff = 0 for chunk in s_common.chunks(item, chunksize): clen = len(chunk) if offset and foff + clen < offset: # We have not yet encountered a chunk which # will include the offset size. foff += clen continue await core.addFeedData(feedformat, chunk, viewiden=viewiden) foff += clen outp.printf(f'Added [{clen}] items from [{bname}] - offset [{foff}]') tock = time.time() outp.printf(f'Done consuming from [{bname}]') outp.printf(f'Took [{tock - tick}] seconds.') if debug: await s_cmdr.runItemCmdr(core, outp, True)
[docs]async def main(argv, outp=None): if outp is None: # pragma: no cover outp = s_output.OutPut() pars = makeargparser() opts = pars.parse_args(argv) if opts.offset: if len(opts.files) > 1: outp.printf('Cannot start from a arbitrary offset for more than 1 file.') return 1 outp.printf(f'Starting from offset [{opts.offset}] - it may take a while' f' to get to that location in the input file.') if opts.test: async with s_cortex.getTempCortex(mods=opts.modules) as prox: await addFeedData(prox, outp, opts.format, opts.debug, chunksize=opts.chunksize, offset=opts.offset, *opts.files) elif opts.cortex: async with s_telepath.withTeleEnv(): async with await s_telepath.openurl(opts.cortex) as core: try: s_version.reqVersion(core._getSynVers(), reqver) except s_exc.BadVersion as e: valu = s_version.fmtVersion(*e.get('valu')) outp.printf(f'Cortex version {valu} is outside of the feed tool supported range ({reqver}).') outp.printf(f'Please use a version of Synapse which supports {valu}; ' f'current version is {s_version.verstring}.') return 1 await addFeedData(core, outp, opts.format, opts.debug, chunksize=opts.chunksize, offset=opts.offset, viewiden=opts.view, *opts.files) else: # pragma: no cover outp.printf('No valid options provided [%s]', opts) return 1 return 0
[docs]def makeargparser(): desc = 'Command line tool for ingesting data into a cortex' pars = argparse.ArgumentParser('synapse.tools.feed', description=desc) muxp = pars.add_mutually_exclusive_group(required=True) muxp.add_argument('--cortex', '-c', type=str, help='Cortex to connect and add nodes too.') muxp.add_argument('--test', '-t', default=False, action='store_true', help='Perform a local ingest against a temporary cortex.') pars.add_argument('--debug', '-d', default=False, action='store_true', help='Drop to interactive prompt to inspect cortex after loading data.') pars.add_argument('--format', '-f', type=str, action='store', default='syn.nodes', help='Feed format to use for the ingested data.') pars.add_argument('--modules', '-m', type=str, action='append', default=[], help='Additional modules to load locally with a test Cortex.') pars.add_argument('--chunksize', type=int, action='store', default=1000, help='Default chunksize for iterating over items.') pars.add_argument('--offset', type=int, action='store', default=0, help='Item offset to start consuming data from.') pars.add_argument('--view', type=str, action='store', default=None, help='The View to ingest the data into.') pars.add_argument('files', nargs='*', help='json/yaml/msgpack feed files') return pars
if __name__ == '__main__': # pragma: no cover s_common.setlogging(logger, 'DEBUG') asyncio.run(main(sys.argv[1:]))