Source code for synapse.tools.modrole

import sys
import asyncio
import argparse

import synapse.common as s_common
import synapse.telepath as s_telepath

import synapse.lib.output as s_output

descr = '''
Add or modify a role in a Synapse service.
'''

[docs]def printrole(role, outp): outp.printf(f'Role: {role.get("name")} ({role.get("iden")})') outp.printf('') outp.printf(' Rules:') for indx, rule in enumerate(role.get('rules')): outp.printf(f' [{str(indx).ljust(3)}] - {s_common.reprauthrule(rule)}') outp.printf('') outp.printf(' Gates:') for gateiden, gateinfo in role.get('authgates', {}).items(): outp.printf(f' {gateiden}') outp.printf(f' Admin: {gateinfo.get("admin") == True}') for indx, rule in enumerate(gateinfo.get('rules', ())): outp.printf(f' [{str(indx).ljust(3)}] - {s_common.reprauthrule(rule)}')
[docs]async def main(argv, outp=s_output.stdout): pars = argparse.ArgumentParser(prog='modrole', description=descr) pars.add_argument('--svcurl', default='cell:///vertex/storage', help='The telepath URL of the Synapse service.') pars.add_argument('--add', default=False, action='store_true', help='Add the role if they do not already exist.') pars.add_argument('--del', dest='delete', default=False, action='store_true', help='Delete the role if it exists.') pars.add_argument('--list', default=False, action='store_true', help='List existing roles, or rules of a specific role.') pars.add_argument('--allow', default=[], action='append', help='A permission string to allow for the role.') pars.add_argument('--deny', default=[], action='append', help='A permission string to deny for the role.') pars.add_argument('--gate', default=None, help='The iden of an auth gate to add/del rules on.') pars.add_argument('rolename', nargs='?', help='The rolename to add/edit.') opts = pars.parse_args(argv) if opts.add and opts.delete: outp.printf('ERROR: Cannot specify --add and --del together.') return 1 async with s_telepath.withTeleEnv(): async with await s_telepath.openurl(opts.svcurl) as cell: if opts.list: if opts.rolename: role = await cell.getRoleDefByName(opts.rolename) if role is None: outp.printf(f'ERROR: Role not found: {opts.rolename}') return 1 printrole(role, outp) else: outp.printf('Roles:') for role in await cell.getRoleDefs(): outp.printf(f' {role.get("iden")} - {role.get("name")}') return 0 elif opts.rolename is None: outp.printf(f'ERROR: A rolename argument is required when --list is not specified.') return 1 if opts.gate: gate = await cell.getAuthGate(opts.gate) if gate is None: outp.printf(f'ERROR: No auth gate found with iden: {opts.gate}') return 1 role = await cell.getRoleDefByName(opts.rolename) if role is not None: outp.printf(f'Modifying role: {opts.rolename}') if role is None: if not opts.add: outp.printf(f'ERROR: Role not found (need --add?): {opts.rolename}') return 1 outp.printf(f'Adding role: {opts.rolename}') role = await cell.addRole(opts.rolename) roleiden = role.get('iden') if opts.delete: outp.printf(f'...deleting role: {opts.rolename}') await cell.delRole(roleiden) return 0 for allow in opts.allow: perm = allow.lower().split('.') mesg = f'...adding allow rule: {allow}' if opts.gate: mesg += f' on gate {opts.gate}' outp.printf(mesg) if not await cell.isRoleAllowed(roleiden, perm, gateiden=opts.gate): await cell.addRoleRule(roleiden, (True, perm), indx=0, gateiden=opts.gate) for deny in opts.deny: perm = deny.lower().split('.') mesg = f'...adding deny rule: {deny}' if opts.gate: mesg += f' on gate {opts.gate}' outp.printf(mesg) if await cell.isRoleAllowed(roleiden, perm, gateiden=opts.gate): await cell.addRoleRule(roleiden, (False, perm), indx=0, gateiden=opts.gate) return 0
if __name__ == '__main__': # pragma: no cover sys.exit(asyncio.run(main(sys.argv[1:])))