Source code for synapse.tools.aha.mirror

import sys
import asyncio
import argparse

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.telepath as s_telepath

import synapse.lib.output as s_output
import synapse.lib.version as s_version

descr = '''
Query the Aha server for the service cluster status of mirrors.

Examples:

    python -m synapse.tools.aha.mirror --timeout 30

'''

[docs] async def get_cell_infos(prox, iden, members, timeout): cell_infos = {} if iden is not None: todo = s_common.todo('getCellInfo') async for svcname, (ok, info) in prox.callAhaPeerApi(iden, todo, timeout=timeout): if not ok: continue cell_infos[svcname] = info return cell_infos
[docs] def build_status_list(members, cell_infos): group_status = [] for svc in members: svcname = svc.get('name') svcinfo = svc.get('svcinfo', {}) status = { 'name': svcname, 'role': '<unknown>', 'online': str('online' in svcinfo), 'ready': 'True', 'host': svcinfo.get('urlinfo', {}).get('host', ''), 'port': str(svcinfo.get('urlinfo', {}).get('port', '')), 'version': '<unknown>', 'nexs_indx': 0 } if svcname in cell_infos: info = cell_infos[svcname] cell_info = info.get('cell', {}) status.update({ 'nexs_indx': cell_info.get('nexsindx', 0), 'role': 'follower' if cell_info.get('uplink') else 'leader', 'version': str(info.get('synapse', {}).get('verstring', '')), 'online': 'True', 'ready': str(cell_info.get('ready', False)) }) group_status.append(status) return group_status
[docs] def output_status(outp, vname, group_status): header = ' {:<40} {:<10} {:<8} {:<7} {:<16} {:<9} {:<12} {:<10}'.format( 'name', 'role', 'online', 'ready', 'host', 'port', 'version', 'nexus idx') outp.printf(header) outp.printf('#' * 120) outp.printf(vname) for status in group_status: if status['nexs_indx'] == 0: status['nexs_indx'] = '<unknown>' line = ' {name:<40} {role:<10} {online:<8} {ready:<7} {host:<16} {port:<9} {version:<12} {nexs_indx:<10}'.format(**status) outp.printf(line)
[docs] def check_sync_status(group_status): indices = {status['nexs_indx'] for status in group_status} known_count = sum(1 for status in group_status) return len(indices) == 1 and known_count == len(group_status)
[docs] def timeout_type(valu): try: ivalu = int(valu) if ivalu < 0: raise ValueError except ValueError: raise s_exc.BadArg(mesg=f"{valu} is not a valid non-negative integer") return ivalu
[docs] async def main(argv, outp=s_output.stdout): pars = argparse.ArgumentParser(prog='synapse.tools.aha.mirror', description=descr, formatter_class=argparse.RawDescriptionHelpFormatter) pars.add_argument('--url', default='cell:///vertex/storage', help='The telepath URL to connect to the AHA service.') pars.add_argument('--timeout', type=timeout_type, default=10, help='The timeout in seconds for individual service API calls') pars.add_argument('--wait', action='store_true', help='Whether to wait for the mirrors to sync.') opts = pars.parse_args(argv) async with s_telepath.withTeleEnv(): try: async with await s_telepath.openurl(opts.url) as prox: try: if not prox._hasTeleFeat('callpeers', vers=1): outp.printf(f'Service at {opts.url} does not support the required callpeers feature.') return 1 except s_exc.NoSuchMeth: outp.printf(f'Service at {opts.url} does not support the required callpeers feature.') return 1 classes = prox._getClasses() if 'synapse.lib.aha.AhaApi' not in classes: outp.printf(f'Service at {opts.url} is not an Aha server') return 1 virtual_services, member_servers = {}, {} async for svc in prox.getAhaSvcs(): name = svc.get('name', '') svcinfo = svc.get('svcinfo', {}) urlinfo = svcinfo.get('urlinfo', {}) hostname = urlinfo.get('hostname', '') if name != hostname: virtual_services[name] = svc else: member_servers[name] = svc mirror_groups = {} for vname, vsvc in virtual_services.items(): vsvc_info = vsvc.get('svcinfo', {}) vsvc_iden = vsvc_info.get('iden') vsvc_leader = vsvc_info.get('leader') vsvc_hostname = vsvc_info.get('urlinfo', {}).get('hostname', '') if not vsvc_iden or not vsvc_hostname or not vsvc_leader: continue primary_member = member_servers.get(vsvc_hostname) if not primary_member: continue members = [primary_member] + [ msvc for mname, msvc in member_servers.items() if mname != vsvc_hostname and msvc.get('svcinfo', {}).get('iden') == vsvc_iden and msvc.get('svcinfo', {}).get('leader') == vsvc_leader ] if len(members) > 1: mirror_groups[vname] = members outp.printf('Service Mirror Groups:') for vname, members in mirror_groups.items(): iden = members[0].get('svcinfo', {}).get('iden') cell_infos = await get_cell_infos(prox, iden, members, opts.timeout) group_status = build_status_list(members, cell_infos) output_status(outp, vname, group_status) if check_sync_status(group_status): outp.printf('Group Status: In Sync') else: outp.printf(f'Group Status: Out of Sync') if opts.wait: leader_nexs = None for status in group_status: if status['role'] == 'leader' and isinstance(status['nexs_indx'], int): leader_nexs = status['nexs_indx'] if leader_nexs is not None: while True: responses = [] todo = s_common.todo('waitNexsOffs', leader_nexs - 1, timeout=opts.timeout) async for svcname, (ok, info) in prox.callAhaPeerApi(iden, todo, timeout=opts.timeout): if ok and info: responses.append((svcname, info)) if len(responses) == len(members): cell_infos = await get_cell_infos(prox, iden, members, opts.timeout) group_status = build_status_list(members, cell_infos) outp.printf('\nUpdated status:') output_status(outp, vname, group_status) if check_sync_status(group_status): outp.printf('Group Status: In Sync') break return 0 except Exception as e: mesg = repr(e) if isinstance(e, s_exc.SynErr): mesg = e.errinfo.get('mesg', repr(e)) outp.printf(f'ERROR: {mesg}') return 1
if __name__ == '__main__': # pragma: no cover sys.exit(asyncio.run(main(sys.argv[1:])))