import os
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.telepath as s_telepath
import synapse.lib.cmd as s_cmd
import synapse.lib.output as s_output
import synapse.lib.msgpack as s_msgpack
descr = '''
Export node edits from a Synapse layer.
'''
[docs]
async def exportLayer(opts, outp):
async with await s_telepath.openurl(opts.url) as cell:
info = await cell.getCellInfo()
if (celltype := info['cell']['type']) != 'cortex':
mesg = f'Layer dump tool only works on cortexes, not {celltype}'
raise s_exc.TypeMismatch(mesg=mesg)
celliden = info['cell']['iden']
cellvers = info['cell']['version']
# Find and read state file
state = {}
statefile = opts.statefile
if statefile is None:
statefile = s_common.genpath(opts.outdir, f'{celliden}.{opts.iden}.yaml')
if (data := s_common.yamlload(statefile)) is not None:
state = data
if (soffs := opts.offset) is None:
soffs = state.get('offset:next', 0)
eoffs = None
async with await s_telepath.openurl(opts.url, name=f'*/layer/{opts.iden}') as layer:
# Handle no edits to export
if soffs >= await layer.getEditIndx():
mesg = f'No edits to export starting from offset ({soffs})'
raise s_exc.BadArg(mesg=mesg)
finished = False
genr = layer.syncNodeEdits2(soffs, wait=False)
nodeiter = aiter(genr)
while not finished:
try:
# Pull the first edit so we can get the starting offset
first = await anext(nodeiter)
except StopAsyncIteration: # pragma: no cover
break
soffs = first[0]
with s_common.tmpfile(opts.outdir, prefix='layer.dump') as (fd, tmppath):
# Write header to file
fd.write(s_msgpack.en((
'init',
{
'hdrvers': 1,
'celliden': celliden,
'cellvers': cellvers,
'layriden': opts.iden,
'offset': soffs,
'chunksize': opts.chunksize,
'tick': s_common.now(),
}
)))
# Now write the first edit that we already pulled
fd.write(s_msgpack.en(('edit', first)))
count = 1
async for nodeedit in nodeiter:
# Write individual edits to file
fd.write(s_msgpack.en(('edit', nodeedit)))
eoffs = nodeedit[0]
count += 1
if opts.chunksize and count % opts.chunksize == 0:
break
else:
finished = True
# Write footer to file
fd.write(s_msgpack.en(('fini', {
'offset': eoffs,
'tock': s_common.now(),
})))
path = s_common.genpath(opts.outdir, f'{celliden}.{opts.iden}.{soffs}-{eoffs}.nodeedits')
os.rename(tmppath, path)
outp.printf(f'Wrote layer node edits {soffs}-{eoffs} to {path}.')
# Save state file after each export file
state['offset:next'] = eoffs + 1
s_common.yamlsave(state, statefile)
return 0
[docs]
async def main(argv, outp=s_output.stdout):
pars = s_cmd.Parser(prog='layer.dump', outp=outp, description=descr)
pars.add_argument('--url', default='cell:///vertex/storage',
help='The telepath URL of the Synapse service.')
pars.add_argument('--offset', default=None, type=int,
help='The starting offset of the node edits to export.')
pars.add_argument('--chunksize', default=0, type=int,
help='The number of node edits to store in a single file. Zero to disable chunking.')
pars.add_argument('--statefile', type=str, default=None,
help='Path to the state tracking file for this layer dump.')
pars.add_argument('iden', help='The iden of the layer to export.')
pars.add_argument('outdir', help='The directory to save the exported node edits to.')
opts = pars.parse_args(argv)
if os.path.exists(opts.outdir) and not os.path.isdir(opts.outdir):
mesg = f'Specified output directory {opts.outdir} exists but is not a directory.'
outp.printf(f'ERROR: {mesg}')
return 1
os.makedirs(opts.outdir, exist_ok=True)
async with s_telepath.withTeleEnv():
try:
await exportLayer(opts, outp)
except s_exc.SynErr as exc:
mesg = exc.get('mesg')
outp.printf(f'ERROR: {mesg}.')
return 1
except Exception as exc: # pragma: no cover
mesg = str(exc)
outp.printf(f'ERROR: {mesg}.')
return 1
outp.printf(f'Successfully exported layer {opts.iden}.')
return 0
if __name__ == '__main__': # pragma: no cover
s_cmd.exitmain(main)