Source code for synapse.tools.axon.load

import tarfile

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.telepath as s_telepath

import synapse.lib.cmd as s_cmd
import synapse.lib.const as s_const
import synapse.lib.output as s_output

descr = '''
Load blobs into a Synapse Axon.
'''

DEFAULT_CHUNK_SIZE = s_const.mebibyte * 16

[docs] async def loadBlobs(opts, outp, tarfiles): try: async with await s_telepath.openurl(opts.url) as axon: cellinfo = await axon.getCellInfo() celltype = cellinfo['cell']['type'] if "axon" not in celltype.lower(): mesg = f'Axon load tool only works on axons, not {celltype}' raise s_exc.TypeMismatch(mesg=mesg) for tarfile_path in tarfiles: outp.printf(f'Processing tar archive: {tarfile_path}') with tarfile.open(tarfile_path, 'r:gz') as tar: for member in tar: if not member.name.endswith('.blob'): continue sha2hex = member.name[:-5] try: sha256 = s_common.uhex(sha2hex) except Exception: outp.printf(f"Skipping invalid blob filename: {member.name}") continue if await axon.has(sha256): outp.printf(f"Skipping existing blob {sha2hex}") continue outp.printf(f"Loading blob {sha2hex} (size={member.size})") try: fobj = tar.extractfile(member) except OSError as e: outp.printf(f"WARNING: Error extracting {member.name}: {e}") continue if fobj is None: outp.printf(f"Failed to extract {member.name} from tar archive.") continue async with await axon.upload() as upfd: while True: chunk = fobj.read(DEFAULT_CHUNK_SIZE) if not chunk: break await upfd.write(chunk) await upfd.save() except s_exc.SynErr as exc: return (False, s_exc.reprexc(exc)) except Exception as e: mesg = f'Error {e} loading blobs into Axon url: {opts.url}' return (False, mesg) return (True, None)
[docs] async def main(argv, outp=s_output.stdout): pars = s_cmd.Parser(prog='synapse.tools.axon.load', outp=outp, description=descr) pars.add_argument('--url', default='cell:///vertex/storage', help='Telepath URL for the Axon.') pars.add_argument('files', nargs='+', help='List of .tar.gz files to import from.') opts = pars.parse_args(argv) tarfiles = sorted([f for f in opts.files if f.endswith('.tar.gz')]) async with s_telepath.withTeleEnv(): (ok, mesg) = await loadBlobs(opts, outp, tarfiles) if not ok: outp.printf(f'ERROR: {mesg}') return 1 outp.printf('Successfully loaded blobs.') return 0
if __name__ == '__main__': # pragma: no cover s_cmd.exitmain(main)