import os
import tarfile
import tempfile
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.telepath as s_telepath
import synapse.lib.cmd as s_cmd
import synapse.lib.const as s_const
import synapse.lib.output as s_output
descr = '''
Dump blobs from a Synapse Axon.
'''
DEFAULT_ROTATE_SIZE = s_const.gigabyte * 4
[docs]
def getTarName(celliden, start, end):
return f'{celliden}.{start:012d}-{end:012d}.tar.gz'
[docs]
async def dumpBlobs(opts, outp):
try:
async with await s_telepath.openurl(opts.url) as axon:
cellinfo = await axon.getCellInfo()
celltype = cellinfo['cell']['type']
if "axon" not in celltype.lower():
mesg = f'Axon dump tool only works on axons, not {celltype}'
raise s_exc.TypeMismatch(mesg=mesg)
celliden = cellinfo['cell']['iden']
if os.path.exists(opts.outdir) and not os.path.isdir(opts.outdir):
raise s_exc.BadDataValu(mesg=f'Specified output directory {opts.outdir} exists but is not a directory.')
os.makedirs(opts.outdir, exist_ok=True)
start = opts.offset
rotate_size = opts.rotate_size
hashes_iter = axon.hashes(start)
last_offset = start
tar = None
tar_path = None
file_start = start
file_blobcount = 0
tar_size = 0
for_open = True
if opts.statefile is None:
statefile_path = os.path.join(opts.outdir, f'{celliden}.yaml')
else:
statefile_path = opts.statefile
if os.path.isdir(statefile_path):
statefile_path = os.path.join(statefile_path, f'{celliden}.yaml')
state = {}
if os.path.isfile(statefile_path):
if (data := s_common.yamlload(statefile_path)) is not None:
state = data
opts.statefile = statefile_path
try:
async for (offs, (sha256, size)) in hashes_iter:
if for_open:
tar_path = os.path.join(opts.outdir, getTarName(celliden, offs, -1))
tar = tarfile.open(tar_path, 'w:gz')
file_start = offs
tar_size = 0
file_blobcount = 0
for_open = False
last_offset = offs
file_blobcount += 1
sha2hex = s_common.ehex(sha256)
outp.printf(f'Dumping blob {sha2hex} (size={size}, offs={offs})')
with s_common.tmpfile(opts.outdir) as (tmpf, tmp_path):
total = 0
async for byts in axon.get(sha256):
tmpf.write(byts)
total += len(byts)
if total != size:
raise s_exc.BadDataValu(mesg=f'Blob size mismatch for {sha2hex}: expected {size}, got {total}')
tmpf.flush()
tmpf.seek(0)
tarinfo = tarfile.TarInfo(name=f"{sha2hex}.blob")
tarinfo.size = size
tar.addfile(tarinfo, tmpf)
tar_size += size
if tar_size >= rotate_size:
outp.printf(f'Rotating to new .tar.gz file at offset {offs + 1}')
tar.close()
final_name = os.path.join(opts.outdir, getTarName(celliden, file_start, offs + 1))
os.rename(tar_path, final_name)
tar = None
tar_path = None
for_open = True
if tar is not None:
tar.close()
final_name = os.path.join(opts.outdir, getTarName(celliden, file_start, last_offset + 1))
os.rename(tar_path, final_name)
tar = None
tar_path = None
if opts.statefile is not None:
state['offset:next'] = last_offset + 1
s_common.yamlsave(state, statefile_path)
finally:
if tar is not None:
tar.close()
if tar_path and os.path.isfile(tar_path):
os.remove(tar_path)
except s_exc.SynErr as exc:
mesg = exc.get('mesg')
return (False, mesg)
except Exception as e:
mesg = f'Error {e} dumping blobs from Axon url: {opts.url}'
return (False, mesg)
return (True, None)
[docs]
async def main(argv, outp=s_output.stdout):
pars = s_cmd.Parser(prog='synapse.tools.axon.dump', outp=outp, description=descr)
pars.add_argument('--url', default='cell:///vertex/storage', help='Telepath URL for the Axon.')
pars.add_argument('--offset', type=int, default=0, help='Starting offset in the Axon.')
pars.add_argument('--rotate-size', type=int, default=DEFAULT_ROTATE_SIZE,
help='Rotate to a new .blobs file after the current file exceeds this size in bytes (default: 4GB). '
'Note: files may exceed this size if a single blob is larger than the remaining space.')
pars.add_argument('--statefile', type=str, default=None,
help='Path to the state tracking file for the Axon dump.')
pars.add_argument('outdir', help='Directory to dump tar.gz files (required).')
opts = pars.parse_args(argv)
async with s_telepath.withTeleEnv():
(ok, mesg) = await dumpBlobs(opts, outp)
if not ok:
outp.printf(f'ERROR: {mesg}')
return 1
outp.printf('Successfully dumped blobs.')
return 0
if __name__ == '__main__': # pragma: no cover
s_cmd.exitmain(main)