Source code for synapse.tools.moduser

import sys
import yaml
import asyncio
import argparse

import synapse.common as s_common
import synapse.telepath as s_telepath

import synapse.lib.output as s_output

descr = '''
Add, modify, or list users of a Synapse service.
'''

[docs]def printuser(user, outp): admin = user.get('admin') authtype = user.get('type') outp.printf(f'User: {user.get("name")} ({user.get("iden")})') outp.printf('') outp.printf(f' Locked: {user.get("locked")}') outp.printf(f' Admin: {user.get("admin")}') outp.printf(f' Email: {user.get("email")}') outp.printf(' Rules:') for indx, rule in enumerate(user.get('rules')): outp.printf(f' [{str(indx).ljust(3)}] - {s_common.reprauthrule(rule)}') outp.printf('') outp.printf(' Roles:') for role in user.get('roles'): outp.printf(f' {role.get("iden")} - {role.get("name")}') outp.printf('') outp.printf(' Gates:') for gateiden, gateinfo in user.get('authgates', {}).items(): outp.printf(f' {gateiden}') outp.printf(f' Admin: {gateinfo.get("admin") == True}') for indx, rule in enumerate(gateinfo.get('rules', ())): outp.printf(f' [{str(indx).ljust(3)}] - {s_common.reprauthrule(rule)}')
[docs]async def main(argv, outp=s_output.stdout): pars = argparse.ArgumentParser(prog='moduser', description=descr) pars.add_argument('--svcurl', default='cell:///vertex/storage', help='The telepath URL of the Synapse service.') pars.add_argument('--add', default=False, action='store_true', help='Add the user if they do not already exist.') pars.add_argument('--del', dest='delete', default=False, action='store_true', help='Delete the user if they exist.') pars.add_argument('--list', default=False, action='store_true', help='List existing users of the service, or details of a specific user.') pars.add_argument('--admin', choices=('true', 'false'), default=None, help='Set the user admin status.') pars.add_argument('--passwd', action='store', type=str, help='A password to set for the user.') pars.add_argument('--email', action='store', type=str, help='An email to set for the user.') pars.add_argument('--locked', choices=('true', 'false'), default=None, help='Set the user locked status.') pars.add_argument('--grant', default=[], action='append', help='A role to grant to the user.') pars.add_argument('--revoke', default=[], action='append', help='A role to revoke from the user.') pars.add_argument('--allow', default=[], action='append', help='A permission string to allow for the user.') pars.add_argument('--deny', default=[], action='append', help='A permission string to deny for the user.') pars.add_argument('--gate', default=None, help='The iden of an auth gate to add/del rules or set admin status on.') pars.add_argument('username', nargs='?', help='The username to add/edit or show details.') opts = pars.parse_args(argv) if opts.add and opts.delete: outp.printf('ERROR: Cannot specify --add and --del together.') return 1 async with s_telepath.withTeleEnv(): async with await s_telepath.openurl(opts.svcurl) as cell: if opts.list: if opts.username: user = await cell.getUserDefByName(opts.username) if user is None: outp.printf(f'ERROR: User not found: {opts.username}') return 1 printuser(user, outp) else: outp.printf('Users:') for user in await cell.getUserDefs(): outp.printf(f' {user.get("name")}') return 0 elif opts.username is None: outp.printf(f'ERROR: A username argument is required when --list is not specified.') return 1 if opts.gate: gate = await cell.getAuthGate(opts.gate) if gate is None: outp.printf(f'ERROR: No auth gate found with iden: {opts.gate}') return 1 grants = [] revokes = [] for rolename in opts.grant: role = await cell.getRoleDefByName(rolename) if role is None: outp.printf(f'ERROR: Role not found: {rolename}') return 1 grants.append(role) for rolename in opts.revoke: role = await cell.getRoleDefByName(rolename) if role is None: outp.printf(f'ERROR: Role not found: {rolename}') return 1 revokes.append(role) user = await cell.getUserDefByName(opts.username) if user is None: if not opts.add: outp.printf(f'ERROR: User not found (need --add?): {opts.username}') return 1 outp.printf(f'Adding user: {opts.username}') user = await cell.addUser(opts.username) else: outp.printf(f'Modifying user: {opts.username}') useriden = user.get('iden') if not s_common.isguid(useriden): # pragma: no cover outp.printf(f'ERROR: Invalid useriden: {useriden}') return 1 if opts.delete: outp.printf(f'...deleting user: {opts.username}') await cell.delUser(useriden) return 0 if opts.admin is not None: admin = s_common.yamlloads(opts.admin) mesg = f'...setting admin: {opts.admin}' if opts.gate: mesg += f' on gate {opts.gate}' outp.printf(mesg) await cell.setUserAdmin(useriden, admin, gateiden=opts.gate) if opts.locked is not None: locked = s_common.yamlloads(opts.locked) outp.printf(f'...setting locked: {opts.locked}') await cell.setUserLocked(useriden, locked) if opts.passwd is not None: outp.printf(f'...setting passwd: {opts.passwd}') await cell.setUserPasswd(useriden, opts.passwd) if opts.email is not None: outp.printf(f'...setting email: {opts.email}') await cell.setUserEmail(useriden, opts.email) for role in grants: rolename = role.get('name') outp.printf(f'...granting role: {rolename}') await cell.addUserRole(useriden, role.get('iden')) for role in revokes: rolename = role.get('name') outp.printf(f'...revoking role: {rolename}') await cell.delUserRole(useriden, role.get('iden')) for allow in opts.allow: perm = allow.lower().split('.') mesg = f'...adding allow rule: {allow}' if opts.gate: mesg += f' on gate {opts.gate}' outp.printf(mesg) if not await cell.isUserAllowed(useriden, perm, gateiden=opts.gate): await cell.addUserRule(useriden, (True, perm), indx=0, gateiden=opts.gate) for deny in opts.deny: perm = deny.lower().split('.') mesg = f'...adding deny rule: {deny}' if opts.gate: mesg += f' on gate {opts.gate}' outp.printf(mesg) if await cell.isUserAllowed(useriden, perm, gateiden=opts.gate): await cell.addUserRule(useriden, (False, perm), indx=0, gateiden=opts.gate) return 0
if __name__ == '__main__': # pragma: no cover sys.exit(asyncio.run(main(sys.argv[1:])))