Source code for synapse.tools.cortex.layer.load

import os

import synapse.exc as s_exc
import synapse.telepath as s_telepath

import synapse.lib.cmd as s_cmd
import synapse.lib.time as s_time
import synapse.lib.output as s_output
import synapse.lib.msgpack as s_msgpack
import synapse.lib.version as s_version

descr = '''
Import node edits to a Synapse layer.
'''

[docs] async def importLayer(infiles, opts, outp): async with await s_telepath.openurl(opts.url) as cell: info = await cell.getCellInfo() if (celltype := info['cell']['type']) != 'cortex': mesg = f'Layer load tool only works on cortexes, not {celltype}.' raise s_exc.TypeMismatch(mesg=mesg) # Get the highest cellvers from all the input files reqver = max([infile[0].get('cellvers') for infile in infiles]) if (synver := info.get('cell').get('version')) < reqver: synstr = s_version.fmtVersion(*synver) reqstr = s_version.fmtVersion(*reqver) mesg = f'Synapse version mismatch ({synstr} < {reqstr}).' raise s_exc.BadVersion(mesg=mesg) async with await s_telepath.openurl(opts.url, name=f'*/layer/{opts.iden}') as layer: for header, filename, genr in infiles: soffs = header.get('offset') tick = header.get('tick') outp.printf(f'Loading {filename}, offset={soffs}, tick={s_time.repr(tick)}.') eoffs = soffs fini = None for item in genr: match item: case ('edit', (eoffs, edit, meta)): if opts.dryrun: continue await layer.saveNodeEdits(edit, meta=meta) case ('fini', info): fini = info break case _: mtype = item[0] mesg = f'Unexpected message type: {mtype}.' raise s_exc.BadMesgFormat(mesg=mesg) if fini is None: mesg = f'Incomplete/corrupt export: {filename}.' raise s_exc.BadDataValu(mesg=mesg) elif (offset := fini.get('offset')) != eoffs: mesg = f'Incomplete/corrupt export: {filename}. Expected offset {offset}, got {eoffs}.' raise s_exc.BadDataValu(mesg=mesg) else: outp.printf(f'Successfully loaded {filename} with {eoffs + 1 - soffs} edits ({soffs} - {eoffs}).')
[docs] async def main(argv, outp=s_output.stdout): pars = s_cmd.Parser(prog='layer.load', outp=outp, description=descr) pars.add_argument('--dryrun', action='store_true', help="Process files but don't apply changes.") pars.add_argument('--url', default='cell:///vertex/storage', help='The telepath URL of the Synapse service.') pars.add_argument('iden', help='The iden of the layer to import to.') pars.add_argument('files', nargs='+', help='The .nodeedits files to import from.') opts = pars.parse_args(argv) infiles = [] try: # Load the files for filename in opts.files: if not os.path.exists(filename) or not os.path.isfile(filename): mesg = f'Invalid input file specified: {filename}.' raise s_exc.NoSuchFile(mesg=mesg) genr = s_msgpack.iterfile(filename) header = next(genr) if header[0] != 'init': mesg = f'Invalid header in {filename}.' raise s_exc.BadMesgFormat(mesg=mesg) infiles.append((header[1], filename, genr)) # Sort the files based on their offset infiles = sorted(infiles, key=lambda x: x[0].get('offset')) outp.printf('Processing the following nodeedits:') outp.printf('Offset | Filename') outp.printf('-----------------|----------') for header, filename, genr in infiles: offset = header.get('offset') outp.printf(f'{offset:<16d} | {filename}') async with s_telepath.withTeleEnv(): await importLayer(infiles, opts, outp) return 0 except s_exc.SynErr as exc: mesg = exc.get('mesg') outp.printf(f'ERROR: {mesg}.') return 1 except Exception as exc: # pragma: no cover outp.printf(f'ERROR: {str(exc)}.') return 1
if __name__ == '__main__': # pragma: no cover s_cmd.exitmain(main)