Source code for synapse.lib.stormlib.auth

import copy
import asyncio

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.stormtypes as s_stormtypes

stormcmds = (
    {
        'name': 'auth.user.add',
        'descr': '''
            Add a user.

            Examples:

                // Add a user named "visi" with the email address "[email protected]"
                auth.user.add visi --email [email protected]
        ''',
        'cmdargs': (
            ('name', {'type': 'str', 'help': 'The name of the user.'}),
            ('--email', {'type': 'str', 'help': "The user's email address.", 'default': None}),
        ),
        'storm': '''
            $user = $lib.auth.users.add($cmdopts.name, email=$cmdopts.email)
            $lib.print('User ({name}) added with iden: {iden}', name=$user.name, iden=$user.iden)
        ''',
    },
    {
        'name': 'auth.user.list',
        'descr': '''
            List all users.

            Examples:

                // Display the list of all users
                auth.user.list
        ''',
        'storm': '''
            $users = ([])
            $locked = ([])
            for $user in $lib.auth.users.list() {
                if $user.locked { $locked.append($user.name) }
                else { $users.append($user.name) }
            }

            $lib.print("Users:")
            for $user in $lib.sorted($users) {
                $lib.print(`  {$user}`)
            }

            $lib.print("")
            $lib.print("Locked Users:")
            for $user in $lib.sorted($locked) {
                $lib.print(`  {$user}`)
            }
        ''',
    },
    {
        'name': 'auth.user.mod',
        'descr': '''
            Modify properties of a user.

            Examples:

                // Rename the user "foo" to "bar"
                auth.user.mod foo --name bar

                // Make the user "visi" an admin
                auth.user.mod visi --admin $lib.true

                // Unlock the user "visi" and set their email to "[email protected]"
                auth.user.mod visi --locked $lib.false --email [email protected]

                // Grant admin access to user visi for the current view
                auth.user.mod visi --admin $lib.true --gate $lib.view.get().iden

                // Revoke admin access to user visi for the current view
                auth.user.mod visi --admin $lib.false --gate $lib.view.get().iden
        ''',
        'cmdargs': (
            ('username', {'type': 'str', 'help': 'The name of the user.'}),
            ('--name', {'type': 'str', 'help': 'The new name for the user.'}),
            ('--email', {'type': 'str', 'help': 'The email address to set for the user.'}),
            ('--passwd', {'type': 'str', 'help': 'The new password for the user. This is best passed into the runtime as a variable.'}),
            ('--admin', {'type': 'bool', 'help': 'True to make the user and admin, false to remove their remove their admin status.'}),
            ('--gate', {'type': 'str', 'help': 'The auth gate iden to grant or revoke admin status on. Use in conjunction with `--admin <bool>`.'}),
            ('--locked', {'type': 'bool', 'help': 'True to lock the user, false to unlock them.'}),
        ),
        'storm': '''
            $user = $lib.auth.users.byname($cmdopts.username)
            if $user {
                if $cmdopts.name {
                    $user.name = $cmdopts.name
                    $lib.print(`User ({$cmdopts.username}) renamed to {$cmdopts.name}.`)
                }
                if $cmdopts.email {
                    $user.email = $cmdopts.email
                    $lib.print(`User ({$cmdopts.username}) email address set to {$cmdopts.email}.`)
                }
                if $cmdopts.passwd {
                    $user.setPasswd($cmdopts.passwd)
                    $lib.print(`User ({$cmdopts.username}) password updated.`)
                }
                if ($cmdopts.locked != $lib.null) {
                    $user.setLocked($cmdopts.locked)
                    $lib.print(`User ({$cmdopts.username}) locked status set to {$cmdopts.locked=1}.`)
                }
                if ($cmdopts.admin != $lib.null) {
                    $user.setAdmin($cmdopts.admin, gateiden=$cmdopts.gate)
                    if $cmdopts.gate {
                        $lib.print(`User ({$cmdopts.username}) admin status set to {$cmdopts.admin=1} for auth gate {$cmdopts.gate}.`)
                    } else {
                        $lib.print(`User ({$cmdopts.username}) admin status set to {$cmdopts.admin=1}.`)
                    }
                }
                if ($cmdopts.gate != $lib.null and $cmdopts.admin = $lib.null) {
                    $lib.exit('Granting/revoking admin status on an auth gate, requires the use of `--admin <true|false>` also.')
                }
            } else {
                $lib.warn(`User ({$cmdopts.username}) not found!`)
            }
        ''',
    },
    {
        'name': 'auth.role.add',
        'descr': '''
            Add a role.

            Examples:

                // Add a role named "ninjas"
                auth.role.add ninjas
        ''',
        'cmdargs': (
            ('name', {'type': 'str', 'help': 'The name of the role.'}),
        ),
        'storm': '''
            $role = $lib.auth.roles.add($cmdopts.name)
            $lib.print('Role ({name}) added with iden: {iden}', name=$role.name, iden=$role.iden)
        ''',
    },
    {
        'name': 'auth.role.list',
        'descr': '''
            List all roles.

            Examples:

                // Display the list of all roles
                auth.role.list
        ''',
        'storm': '''
            $roles = ([])
            for $role in $lib.auth.roles.list() {
                $roles.append($role.name)
            }

            $lib.print("Roles:")
            for $role in $lib.sorted($roles) {
                $lib.print(`  {$role}`)
            }
        ''',
    },
    {
        'name': 'auth.role.del',
        'descr': '''
            Delete a role.

            Examples:

                // Delete a role named "ninjas"
                auth.role.del ninjas
        ''',
        'cmdargs': (
            ('name', {'type': 'str', 'help': 'The name of the role.'}),
        ),
        'storm': '''
            $role = $lib.auth.roles.byname($cmdopts.name)
            if $role {
                $lib.auth.roles.del($role.iden)
                $lib.print(`Role ({$cmdopts.name}) deleted.`)
            } else {
                $lib.warn(`Role ({$cmdopts.name}) not found!`)
            }
        ''',
    },
    {
        'name': 'auth.role.mod',
        'descr': '''
            Modify properties of a role.

            Examples:

                // Rename the "ninjas" role to "admins"
                auth.role.mod ninjas --name admins
        ''',
        'cmdargs': (
            ('rolename', {'type': 'str', 'help': 'The name of the role.'}),
            ('--name', {'type': 'str', 'help': 'The new name for the role.'}),
        ),
        'storm': '''
            $role = $lib.auth.roles.byname($cmdopts.rolename)
            if $role {
                if $cmdopts.name {
                    $role.name = $cmdopts.name
                    $lib.print(`Role ({$cmdopts.rolename}) renamed to {$cmdopts.name}.`)
                }
            } else {
                $lib.warn(`Role ({$cmdopts.rolename}) not found!`)
            }
        ''',
    },
    {
        'name': 'auth.user.addrule',
        'descr': '''
            Add a rule to a user.

            Examples:

                // add an allow rule to the user "visi" for permission "foo.bar.baz"
                auth.user.addrule visi foo.bar.baz

                // add a deny rule to the user "visi" for permission "foo.bar.baz"
                auth.user.addrule visi "!foo.bar.baz"

                // add an allow rule to the user "visi" for permission "baz" at the first index.
                auth.user.addrule visi baz --index 0
        ''',
        'cmdargs': (
            ('name', {'type': 'str', 'help': 'The name of the user.'}),
            ('rule', {'type': 'str', 'help': 'The rule string.'}),
            ('--gate', {'type': 'str', 'help': 'The auth gate id to grant permission on.', 'default': None}),
            ('--index', {'type': 'int', 'help': 'Specify the rule location as a 0 based index.', 'default': None}),
        ),
        'storm': '''
            $user = $lib.auth.users.byname($cmdopts.name)
            $rule = $lib.auth.ruleFromText($cmdopts.rule)
            if $user {
                $user.addRule($rule, gateiden=$cmdopts.gate, indx=$cmdopts.index)
                $lib.print(`Added rule {$cmdopts.rule} to user {$cmdopts.name}.`)
            } else {
                $lib.warn('User ({name}) not found!', name=$cmdopts.name)
            }
        ''',
    },
    {
        'name': 'auth.user.delrule',
        'descr': '''
            Remove a rule from a user.

            Examples:

                // Delete the allow rule from the user "visi" for permission "foo.bar.baz"
                auth.user.delrule visi foo.bar.baz

                // Delete the deny rule from the user "visi" for permission "foo.bar.baz"
                auth.user.delrule visi "!foo.bar.baz"

                // Delete the rule at index 5 from the user "visi"
                auth.user.delrule visi --index  5
        ''',
        'cmdargs': (
            ('name', {'type': 'str', 'help': 'The name of the user.'}),
            ('rule', {'type': 'str', 'help': 'The rule string.'}),
            ('--gate', {'type': 'str', 'help': 'The auth gate id to grant permission on.', 'default': None}),
            ('--index', {'type': 'bool', 'action': 'store_true', 'default': False,
                'help': 'Specify the rule as a 0 based index into the list of rules.'}),
        ),
        'storm': '''
            $user = $lib.auth.users.byname($cmdopts.name)
            if $user {
                if $cmdopts.index {
                    $rule = $user.popRule($cmdopts.rule, gateiden=$cmdopts.gate)
                } else {
                    $rule = $lib.auth.ruleFromText($cmdopts.rule)
                    $user.delRule($rule, gateiden=$cmdopts.gate)
                }
                $ruletext = $lib.auth.textFromRule($rule)
                $lib.print(`Removed rule {$ruletext} from user {$cmdopts.name}.`)
            } else {
                $lib.warn(`User ({$cmdopts.name}) not found!`)
            }
        ''',
    },
    {
        'name': 'auth.role.addrule',
        'descr': '''
            Add a rule to a role.

            Examples:

                // add an allow rule to the role "ninjas" for permission "foo.bar.baz"
                auth.role.addrule ninjas foo.bar.baz

                // add a deny rule to the role "ninjas" for permission "foo.bar.baz"
                auth.role.addrule ninjas "!foo.bar.baz"

                // add an allow rule to the role "ninjas" for permission "baz" at the first index.
                auth.role.addrule ninjas baz --index 0
        ''',
        'cmdargs': (
            ('name', {'type': 'str', 'help': 'The name of the role.'}),
            ('rule', {'type': 'str', 'help': 'The rule string.'}),
            ('--gate', {'type': 'str', 'help': 'The auth gate id to add the rule to.', 'default': None}),
            ('--index', {'type': 'int', 'help': 'Specify the rule location as a 0 based index.', 'default': None}),
        ),
        'storm': '''
            $role = $lib.auth.roles.byname($cmdopts.name)
            $rule = $lib.auth.ruleFromText($cmdopts.rule)
            if $role {
                $role.addRule($rule, gateiden=$cmdopts.gate, indx=$cmdopts.index)
                $lib.print(`Added rule {$cmdopts.rule} to role {$cmdopts.name}.`)
            } else {
                $lib.warn('Role ({name}) not found!', name=$cmdopts.name)
            }
        ''',
    },
    {
        'name': 'auth.role.delrule',
        'descr': '''
            Remove a rule from a role.

            Examples:

                // Delete the allow rule from the role "ninjas" for permission "foo.bar.baz"
                auth.role.delrule ninjas foo.bar.baz

                // Delete the deny rule from the role "ninjas" for permission "foo.bar.baz"
                auth.role.delrule ninjas "!foo.bar.baz"

                // Delete the rule at index 5 from the role "ninjas"
                auth.role.delrule ninjas --index  5
        ''',
        'cmdargs': (
            ('name', {'type': 'str', 'help': 'The name of the role.'}),
            ('rule', {'type': 'str', 'help': 'The rule string.'}),
            ('--gate', {'type': 'str', 'help': 'The auth gate id to remove the rule from.', 'default': None}),
            ('--index', {'type': 'bool', 'action': 'store_true', 'default': False,
                'help': 'Specify the rule as a 0 based index into the list of rules.'}),
        ),
        'storm': '''
            $role = $lib.auth.roles.byname($cmdopts.name)
            if $role {
                if $cmdopts.index {
                    $rule = $role.popRule($cmdopts.rule, gateiden=$cmdopts.gate)
                } else {
                    $rule = $lib.auth.ruleFromText($cmdopts.rule)
                    $role.delRule($rule, gateiden=$cmdopts.gate)
                }
                $ruletext = $lib.auth.textFromRule($rule)
                $lib.print(`Removed rule {$ruletext} from role {$cmdopts.name}.`)
            } else {
                $lib.warn(`Role ({$cmdopts.name}) not found!`)
            }
        ''',
    },
    {
        'name': 'auth.user.grant',
        'descr': '''
            Grant a role to a user.

            Examples:

                // Grant the role "ninjas" to the user "visi"
                auth.user.grant visi ninjas

                // Grant the role "ninjas" to the user "visi" at the first index.
                auth.user.grant visi ninjas --index 0

        ''',
        'cmdargs': (
            ('username', {'type': 'str', 'help': 'The name of the user.'}),
            ('rolename', {'type': 'str', 'help': 'The name of the role.'}),
            ('--index', {'type': 'int', 'help': 'Specify the role location as a 0 based index.', 'default': None}),
        ),
        'storm': '''
            $user = $lib.auth.users.byname($cmdopts.username)
            if (not $user) { $lib.exit(`No user named: {$cmdopts.username}`) }

            $role = $lib.auth.roles.byname($cmdopts.rolename)
            if (not $role) { $lib.exit(`No role named: {$cmdopts.rolename}`) }

            $lib.print(`Granting role {$role.name} to user {$user.name}.`)
            $user.grant($role.iden, indx=$cmdopts.index)
        ''',
    },
    {
        'name': 'auth.user.revoke',
        'descr': '''
            Revoke a role from a user.

            Examples:

                // Revoke the role "ninjas" from the user "visi"
                auth.user.revoke visi ninjas

        ''',
        'cmdargs': (
            ('username', {'type': 'str', 'help': 'The name of the user.'}),
            ('rolename', {'type': 'str', 'help': 'The name of the role.'}),
        ),
        'storm': '''
            $user = $lib.auth.users.byname($cmdopts.username)
            if (not $user) { $lib.exit(`No user named: {$cmdopts.username}`) }

            $role = $lib.auth.roles.byname($cmdopts.rolename)
            if (not $role) { $lib.exit(`No role named: {$cmdopts.rolename}`) }

            if (not $user.roles().has($role)) {
                $lib.exit(`User {$cmdopts.username} does not have role {$cmdopts.rolename}`)
            }

            $lib.print(`Revoking role {$role.name} from user {$user.name}.`)
            $user.revoke($role.iden)
        ''',
    },
    {
        'name': 'auth.user.show',
        'descr': '''
            Display details for a given user by name.

            Examples:

                // Display details about the user "visi"
                auth.user.show visi
        ''',
        'cmdargs': (
            ('username', {'type': 'str', 'help': 'The name of the user.'}),
        ),
        'storm': '''
            $user = $lib.auth.users.byname($cmdopts.username)
            if (not $user) { $lib.exit(`No user named: {$cmdopts.username}`) }

            $lib.print(`User: {$user.name} ({$user.iden})`)
            $lib.print("")
            $lib.print(`  Locked: {$user.locked}`)
            $lib.print(`  Admin: {$user.admin}`)
            $lib.print(`  Email: {$user.email}`)
            $lib.print("  Rules:")
            for ($indx, $rule) in $lib.iters.enum($user.rules) {
                $ruletext = $lib.auth.textFromRule($rule)
                $lib.print(`    [{$lib.cast(str, $indx).ljust(3)}] - {$ruletext}`)
            }

            $lib.print("")
            $lib.print("  Roles:")
            for $role in $user.roles() {
                $lib.print(`    {$role.iden} - {$role.name}`)
            }

            $lib.print("")
            $lib.print("  Gates:")
            for $gate in $user.gates() {
                for $gateuser in $gate.users {
                    if  ( $gateuser.iden = $user.iden ) {
                        break
                    }
                }
                $lib.print(`    {$gate.iden} - ({$gate.type})`)
                $lib.print(`      Admin: {$gateuser.admin}`)
                for ($indx, $rule) in $lib.iters.enum($user.getRules(gateiden=$gate.iden)) {
                    $ruletext = $lib.auth.textFromRule($rule)
                    $indxtext = $lib.cast(str, $indx).ljust(3)
                    $lib.print(`      [{$indxtext}] - {$ruletext}`)
                }
            }
        ''',
    },
    {
        'name': 'auth.user.allowed',
        'descr': '''
            Show whether the user is allowed the given permission and why.

            Examples:

                auth.user.allowed visi foo.bar
        ''',
        'cmdargs': (
            ('username', {'type': 'str', 'help': 'The name of the user.'}),
            ('permname', {'type': 'str', 'help': 'The permission string.'}),
            ('--gate', {'type': 'str', 'help': 'An auth gate to test the perms against.'}),
        ),
        'storm': '''
            $user = $lib.auth.users.byname($cmdopts.username)
            if (not $user) { $lib.exit(`No user named: {$cmdopts.username}`) }

            ($allow, $reason) = $user.getAllowedReason($cmdopts.permname, gateiden=$cmdopts.gate)
            $lib.print(`allowed: {$allow} - {$reason}`)
        ''',
    },
    {
        'name': 'auth.role.show',
        'descr': '''

            Display details for a given role by name.

            Examples:

                // Display details about the role "ninjas"
                auth.role.show ninjas
        ''',
        'cmdargs': (
            ('rolename', {'type': 'str', 'help': 'The name of the role.'}),
        ),
        'storm': '''

            $role = $lib.auth.roles.byname($cmdopts.rolename)
            if (not $role) { $lib.exit(`No role named: {$cmdopts.rolename}`) }

            $lib.print(`Role: {$role.name} ({$role.iden})`)

            $lib.print("")
            $lib.print("  Rules:")
            for ($indx, $rule) in $lib.iters.enum($role.rules) {
                $ruletext = $lib.auth.textFromRule($rule)
                $indxtext = $lib.cast(str, $indx).ljust(3)
                $lib.print(`    [{$indxtext}] - {$ruletext}`)
            }

            $lib.print("")
            $lib.print("  Gates:")
            for $gate in $role.gates() {
                $lib.print(`    {$gate.iden} - ({$gate.type})`)
                for ($indx, $rule) in $lib.iters.enum($role.getRules(gateiden=$gate.iden)) {
                    $ruletext = $lib.auth.textFromRule($rule)
                    $indxtext = $lib.cast(str, $indx).ljust(3)
                    $lib.print(`      [{$indxtext}] - {$ruletext}`)
                }
            }
        '''
    },
    {
        'name': 'auth.gate.show',
        'descr': '''

            Display users, roles, and permissions for an auth gate.

            Examples:
                // Display the users and roles with permissions to the top layer of the current view.
                auth.gate.show $lib.layer.get().iden

                // Display the users and roles with permissions to the current view.
                auth.gate.show $lib.view.get().iden
        ''',
        'cmdargs': (
            ('gateiden', {'type': 'str', 'help': 'The GUID of the auth gate.'}),
        ),
        'storm': '''

            $gate = $lib.auth.gates.get($cmdopts.gateiden)
            if (not $gate) { $lib.exit(`No auth gate found for iden: {$cmdopts.gateiden}.`) }

            $lib.print(`Gate Type: {$gate.type}`)

            $lib.print("")
            $lib.print("Auth Gate Users:")
            for $gateuser in $gate.users {
                $user = $lib.auth.users.get($gateuser.iden)
                $lib.print(`  {$user.iden} - {$user.name}`)
                $lib.print(`    Admin: {$gateuser.admin}`)
                $lib.print(`    Rules:`)
                for ($indx, $rule) in $lib.iters.enum($gateuser.rules) {
                    $ruletext = $lib.auth.textFromRule($rule)
                    $indxtext = $lib.cast(str, $indx).ljust(3)
                    $lib.print(`     [{$indxtext}] - {$ruletext}`)
                }
            }

            $lib.print("")
            $lib.print("Auth Gate Roles:")
            for $gaterole in $gate.roles {
                $role = $lib.auth.roles.get($gaterole.iden)
                $lib.print(`  {$role.iden} - {$role.name}`)
                $lib.print(`    Rules:`)
                for ($indx, $rule) in $lib.iters.enum($gaterole.rules) {
                    $ruletext = $lib.auth.textFromRule($rule)
                    $indxtext = $lib.cast(str, $indx).ljust(3)
                    $lib.print(`      [{$indxtext}] - {$ruletext}`)
                }
            }
        '''
    },
    {
        'name': 'auth.perms.list',
        'descr': 'Display a list of the current permissions defined within the Cortex.',
        'cmdargs': (),
        'storm': '''

            for $pdef in $lib.auth.getPermDefs() {
                $perm = $lib.str.join(".", $pdef.perm)

                $lib.print($perm)
                $lib.print(`    {$pdef.desc}`)
                $lib.print(`    gate: {$pdef.gate}`)
                $lib.print(`    default: {$pdef.default}`)
                if $pdef.ex { $lib.print(`    example: {$pdef.ex}`) }
                $lib.print('')
            }
        '''
    },
)

[docs] def ruleFromText(text): ''' Get a rule tuple from a text string. Args: text (str): The string to process. Returns: (bool, tuple): A tuple containing a bool and a list of permission parts. ''' allow = True if text.startswith('!'): text = text[1:] allow = False return (allow, tuple(text.split('.')))
[docs] @s_stormtypes.registry.registerType class UserProfile(s_stormtypes.Prim): ''' The Storm deref/setitem/iter convention on top of User profile information. ''' _storm_typename = 'auth:user:profile' _ismutable = True def __init__(self, runt, valu, path=None): s_stormtypes.Prim.__init__(self, valu, path=path) self.runt = runt
[docs] async def deref(self, name): name = await s_stormtypes.tostr(name) self.runt.confirm(('auth', 'user', 'get', 'profile', name)) return await self.runt.snap.core.getUserProfInfo(self.valu, name)
[docs] async def setitem(self, name, valu): name = await s_stormtypes.tostr(name) if valu is s_stormtypes.undef: self.runt.confirm(('auth', 'user', 'pop', 'profile', name)) await self.runt.snap.core.popUserProfInfo(self.valu, name) return valu = await s_stormtypes.toprim(valu) self.runt.confirm(('auth', 'user', 'set', 'profile', name)) await self.runt.snap.core.setUserProfInfo(self.valu, name, valu)
[docs] async def iter(self): profile = await self.value() for item in list(profile.items()): yield item
[docs] async def value(self): self.runt.confirm(('auth', 'user', 'get', 'profile')) return await self.runt.snap.core.getUserProfile(self.valu)
[docs] @s_stormtypes.registry.registerType class UserJson(s_stormtypes.Prim): ''' Implements per-user JSON storage. ''' _storm_typename = 'auth:user:json' _ismutable = False _storm_locals = ( {'name': 'get', 'desc': 'Return a stored JSON object or object property for the user.', 'type': {'type': 'function', '_funcname': 'get', 'args': ( {'name': 'path', 'type': 'str|list', 'desc': 'A path string or list of path parts.'}, {'name': 'prop', 'type': 'str|list', 'desc': 'A property name or list of name parts.', 'default': None}, ), 'returns': {'type': 'prim', 'desc': 'The previously stored value or $lib.null'}}}, {'name': 'set', 'desc': 'Set a JSON object or object property for the user.', 'type': {'type': 'function', '_funcname': 'set', 'args': ( {'name': 'path', 'type': 'str|list', 'desc': 'A path string or list of path elements.'}, {'name': 'valu', 'type': 'prim', 'desc': 'The value to set as the JSON object or object property.'}, {'name': 'prop', 'type': 'str|list', 'desc': 'A property name or list of name parts.', 'default': None}, ), 'returns': {'type': 'boolean', 'desc': 'True if the set operation was successful.'}}}, {'name': 'del', 'desc': 'Delete a stored JSON object or object property for the user.', 'type': {'type': 'function', '_funcname': '_del', 'args': ( {'name': 'path', 'type': 'str|list', 'desc': 'A path string or list of path parts.'}, {'name': 'prop', 'type': 'str|list', 'desc': 'A property name or list of name parts.', 'default': None}, ), 'returns': {'type': 'boolean', 'desc': 'True if the del operation was successful.'}}}, {'name': 'iter', 'desc': 'Yield (<path>, <valu>) tuples for the users JSON objects.', 'type': {'type': 'function', '_funcname': 'iter', 'args': ( {'name': 'path', 'type': 'str|list', 'desc': 'A path string or list of path parts.', 'default': None}, ), 'returns': {'name': 'Yields', 'type': 'list', 'desc': '(<path>, <item>) tuples.'}}}, ) def __init__(self, runt, valu): s_stormtypes.Prim.__init__(self, valu) self.runt = runt self.locls.update({ 'get': self.get, 'set': self.set, 'has': self.has, 'del': self._del, 'iter': self.iter, })
[docs] @s_stormtypes.stormfunc(readonly=True) async def has(self, path): path = await s_stormtypes.toprim(path) if isinstance(path, str): path = tuple(path.split('/')) fullpath = ('users', self.valu, 'json') + path if self.runt.user.iden != self.valu: self.runt.confirm(('user', 'json', 'get')) return await self.runt.snap.core.hasJsonObj(fullpath)
[docs] @s_stormtypes.stormfunc(readonly=True) async def get(self, path, prop=None): path = await s_stormtypes.toprim(path) prop = await s_stormtypes.toprim(prop) if isinstance(path, str): path = tuple(path.split('/')) fullpath = ('users', self.valu, 'json') + path if self.runt.user.iden != self.valu: self.runt.confirm(('user', 'json', 'get')) if prop is None: return await self.runt.snap.core.getJsonObj(fullpath) return await self.runt.snap.core.getJsonObjProp(fullpath, prop=prop)
[docs] async def set(self, path, valu, prop=None): path = await s_stormtypes.toprim(path) valu = await s_stormtypes.toprim(valu) prop = await s_stormtypes.toprim(prop) if isinstance(path, str): path = tuple(path.split('/')) fullpath = ('users', self.valu, 'json') + path if self.runt.user.iden != self.valu: self.runt.confirm(('user', 'json', 'set')) if prop is None: await self.runt.snap.core.setJsonObj(fullpath, valu) return True return await self.runt.snap.core.setJsonObjProp(fullpath, prop, valu)
async def _del(self, path, prop=None): path = await s_stormtypes.toprim(path) prop = await s_stormtypes.toprim(prop) if isinstance(path, str): path = tuple(path.split('/')) fullpath = ('users', self.valu, 'json') + path if self.runt.user.iden != self.valu: self.runt.confirm(('user', 'json', 'set')) if prop is None: await self.runt.snap.core.delJsonObj(fullpath) return True return await self.runt.snap.core.delJsonObjProp(fullpath, prop=prop)
[docs] @s_stormtypes.stormfunc(readonly=True) async def iter(self, path=None): path = await s_stormtypes.toprim(path) if self.runt.user.iden != self.valu: self.runt.confirm(('user', 'json', 'get')) fullpath = ('users', self.valu, 'json') if path is not None: if isinstance(path, str): path = tuple(path.split('/')) fullpath += path async for path, item in self.runt.snap.core.getJsonObjs(fullpath): yield path, item
[docs] @s_stormtypes.registry.registerType class UserVars(s_stormtypes.Prim): ''' The Storm deref/setitem/iter convention on top of User vars information. ''' _storm_typename = 'auth:user:vars' _ismutable = True def __init__(self, runt, valu, path=None): s_stormtypes.Prim.__init__(self, valu, path=path) self.runt = runt
[docs] async def deref(self, name): name = await s_stormtypes.tostr(name) return await self.runt.snap.core.getUserVarValu(self.valu, name)
[docs] async def setitem(self, name, valu): name = await s_stormtypes.tostr(name) if valu is s_stormtypes.undef: await self.runt.snap.core.popUserVarValu(self.valu, name) return valu = await s_stormtypes.toprim(valu) await self.runt.snap.core.setUserVarValu(self.valu, name, valu)
[docs] async def iter(self): async for name, valu in self.runt.snap.core.iterUserVars(self.valu): yield name, valu await asyncio.sleep(0)
[docs] @s_stormtypes.registry.registerType class User(s_stormtypes.Prim): ''' Implements the Storm API for a User. ''' _storm_locals = ( {'name': 'iden', 'desc': 'The User iden.', 'type': 'str', }, {'name': 'get', 'desc': 'Get a arbitrary property from the User definition.', 'type': {'type': 'function', '_funcname': '_methUserGet', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the property to return.', }, ), 'returns': {'type': 'prim', 'desc': 'The requested value.', }}}, {'name': 'roles', 'desc': 'Get the Roles for the User.', 'type': {'type': 'function', '_funcname': '_methUserRoles', 'returns': {'type': 'list', 'desc': 'A list of ``auth:roles`` which the user is a member of.', }}}, {'name': 'pack', 'desc': 'Get the packed version of the User.', 'type': {'type': 'function', '_funcname': '_methUserPack', 'args': (), 'returns': {'type': 'dict', 'desc': 'The packed User definition.', }}}, {'name': 'allowed', 'desc': 'Check if the user has a given permission.', 'type': {'type': 'function', '_funcname': '_methUserAllowed', 'args': ( {'name': 'permname', 'type': 'str', 'desc': 'The permission string to check.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The authgate iden.', 'default': None, }, {'name': 'default', 'type': 'boolean', 'desc': 'The default value.', 'default': False, }, ), 'returns': {'type': 'boolean', 'desc': 'True if the rule is allowed, False otherwise.', }}}, {'name': 'getAllowedReason', 'desc': 'Return an allowed status and reason for the given perm.', 'type': {'type': 'function', '_funcname': '_methGetAllowedReason', 'args': ( {'name': 'permname', 'type': 'str', 'desc': 'The permission string to check.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The authgate iden.', 'default': None, }, {'name': 'default', 'type': 'boolean', 'desc': 'The default value.', 'default': False, }, ), 'returns': {'type': 'list', 'desc': 'An (allowed, reason) tuple.', }}}, {'name': 'grant', 'desc': 'Grant a Role to the User.', 'type': {'type': 'function', '_funcname': '_methUserGrant', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the Role.', }, {'name': 'indx', 'type': 'int', 'desc': 'The position of the Role as a 0 based index.', 'default': None, }, ), 'returns': {'type': 'null', }}}, {'name': 'setRoles', 'desc': ''' Replace all the Roles of the User with a new list of roles. Notes: The roleiden for the "all" role must be present in the new list of roles. This replaces all existing roles that the user has with the new roles. ''', 'type': {'type': 'function', '_funcname': '_methUserSetRoles', 'args': ( {'name': 'idens', 'type': 'list', 'desc': 'The idens of the Roles to set on the User.', }, ), 'returns': {'type': 'null', }}}, {'name': 'revoke', 'desc': 'Remove a Role from the User', 'type': {'type': 'function', '_funcname': '_methUserRevoke', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the Role.', }, ), 'returns': {'type': 'null', }}}, {'name': 'tell', 'desc': 'Send a tell notification to a user.', 'type': {'type': 'function', '_funcname': '_methUserTell', 'args': ( {'name': 'text', 'type': 'str', 'desc': 'The text of the message to send.', }, ), 'returns': {'type': 'null', }}}, {'name': 'notify', 'desc': 'Send an arbitrary user notification.', 'type': {'type': 'function', '_funcname': '_methUserNotify', 'args': ( {'name': 'mesgtype', 'type': 'str', 'desc': 'The notification type.', }, {'name': 'mesgdata', 'type': 'dict', 'desc': 'The notification data.', }, ), 'returns': {'type': 'null', }}}, {'name': 'addRule', 'desc': 'Add a rule to the User.', 'type': {'type': 'function', '_funcname': '_methUserAddRule', 'args': ( {'name': 'rule', 'type': 'list', 'desc': 'The rule tuple to add to the User.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rule.', 'default': None, }, {'name': 'indx', 'type': 'int', 'desc': 'The position of the rule as a 0 based index.', 'default': None, } ), 'returns': {'type': 'null', }}}, {'name': 'delRule', 'desc': 'Remove a rule from the User.', 'type': {'type': 'function', '_funcname': '_methUserDelRule', 'args': ( {'name': 'rule', 'type': 'list', 'desc': 'The rule tuple to removed from the User.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rule.', 'default': None, } ), 'returns': {'type': 'null', }}}, {'name': 'popRule', 'desc': 'Remove a rule by index from the User.', 'type': {'type': 'function', '_funcname': '_methUserPopRule', 'args': ( {'name': 'indx', 'type': 'int', 'desc': 'The index of the rule to remove.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rule.', 'default': None, } ), 'returns': {'type': 'list', 'desc': 'The rule which was removed.'}}}, {'name': 'setRules', 'desc': 'Replace the rules on the User with new rules.', 'type': {'type': 'function', '_funcname': '_methUserSetRules', 'args': ( {'name': 'rules', 'type': 'list', 'desc': 'A list of rule tuples.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rules.', 'default': None, } ), 'returns': {'type': 'null', }}}, {'name': 'getRules', 'desc': 'Get the rules for the user and optional auth gate.', 'type': {'type': 'function', '_funcname': '_methGetRules', 'args': ( {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rules.', 'default': None}, ), 'returns': {'type': 'list', 'desc': 'A list of rules.'}}}, {'name': 'setAdmin', 'desc': 'Set the Admin flag for the user.', 'type': {'type': 'function', '_funcname': '_methUserSetAdmin', 'args': ( {'name': 'admin', 'type': 'boolean', 'desc': 'True to make the User an admin, false to remove their admin status.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the operation.', 'default': None, } ), 'returns': {'type': 'null', }}}, {'name': 'setEmail', 'desc': 'Set the email address of the User.', 'type': {'type': 'function', '_funcname': '_methUserSetEmail', 'args': ( {'name': 'email', 'type': 'str', 'desc': 'The email address to set for the User.', }, ), 'returns': {'type': 'null', }}}, {'name': 'setLocked', 'desc': 'Set the locked status for a user.', 'type': {'type': 'function', '_funcname': '_methUserSetLocked', 'args': ( {'name': 'locked', 'type': 'boolean', 'desc': 'True to lock the user, false to unlock them.', }, ), 'returns': {'type': 'null', }}}, {'name': 'setArchived', 'desc': ''' Set the archived status for a user. Notes: Setting a user as "archived" will also lock the user. Removing a users "archived" status will not unlock the user. ''', 'type': {'type': 'function', '_funcname': '_methUserSetArchived', 'args': ( {'name': 'archived', 'type': 'boolean', 'desc': 'True to archive the user, false to unarchive them.', }, ), 'returns': {'type': 'null', }}}, {'name': 'setPasswd', 'desc': 'Set the Users password.', 'type': {'type': 'function', '_funcname': '_methUserSetPasswd', 'args': ( {'name': 'passwd', 'type': 'str', 'desc': 'The new password for the user. This is best passed into the runtime as a variable.', }, ), 'returns': {'type': 'null', }}}, {'name': 'gates', 'desc': 'Return a list of auth gates that the user has rules for.', 'type': {'type': 'function', '_funcname': '_methGates', 'args': (), 'returns': {'type': 'list', 'desc': 'A list of ``auth:gates`` that the user has rules for.', }}}, {'name': 'name', 'desc': ''' A user's name. This can also be used to set a user's name. Example: Change a user's name:: $user=$lib.auth.users.byname(bob) $user.name=robert ''', 'type': {'type': 'stor', '_storfunc': '_storUserName', 'returns': {'type': 'str', }}}, {'name': 'email', 'desc': ''' A user's email. This can also be used to set the user's email. Example: Change a user's email address:: $user=$lib.auth.users.byname(bob) $user.email="[email protected]" ''', 'type': {'type': ['stor'], '_storfunc': '_methUserSetEmail', 'returns': {'type': ['str', 'null'], }}}, {'name': 'profile', 'desc': ''' A user profile dictionary. This can be used as an application level key-value store. Example: Set a value:: $user=$lib.auth.users.byname(bob) $user.profile.somekey="somevalue" Get a value:: $user=$lib.auth.users.byname(bob) $value = $user.profile.somekey ''', 'type': {'type': ['ctor'], '_ctorfunc': '_ctorUserProfile', 'returns': {'type': 'auth:user:profile', }}}, {'name': 'vars', 'desc': "Get a dictionary representing the user's persistent variables.", 'type': {'type': ['ctor'], '_ctorfunc': '_ctorUserVars', 'returns': {'type': 'auth:user:vars'}}}, {'name': 'genApiKey', 'desc': '''Generate a new API key for the user. Notes: The secret API key returned by this function cannot be accessed again. ''', 'type': {'type': 'function', '_funcname': '_methGenApiKey', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the API key.'}, {'name': 'duration', 'type': 'integer', 'default': None, 'desc': 'Duration of time for the API key to be valid, in milliseconds.'}, ), 'returns': {'type': 'list', 'desc': 'A list, containing the secret API key and a dictionary containing metadata about the key.'}}}, {'name': 'getApiKey', 'desc': "Get information about a user's existing API key.", 'type': {'type': 'function', '_funcname': '_methGetApiKey', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the API key.'}, ), 'returns': {'type': 'dict', 'desc': 'A dictionary containing metadata about the key.'}}}, {'name': 'listApiKeys', 'desc': 'Get information about all the API keys the user has.', 'type': {'type': 'function', '_funcname': '_methListApiKeys', 'args': (), 'returns': {'type': 'list', 'desc': 'A list of dictionaries containing metadata about each key.'}}}, {'name': 'modApiKey', 'desc': 'Modify metadata about an existing API key.', 'type': {'type': 'function', '_funcname': '_methModApiKey', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the API key.'}, {'name': 'name', 'type': 'str', 'desc': 'The name of the valu to update.'}, {'name': 'valu', 'type': 'any', 'desc': 'The new value of the API key.'}, ), 'returns': {'type': 'dict', 'desc': 'An updated dictionary with metadata about the key.'}}}, {'name': 'delApiKey', 'desc': 'Delete an existing API key.', 'type': {'type': 'function', '_funcname': '_methDelApiKey', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the API key.'}, ), 'returns': {'type': 'boolean', 'desc': 'True when the key was deleted.'}}}, ) _storm_typename = 'auth:user' _ismutable = False def __init__(self, runt, valu, path=None): s_stormtypes.Prim.__init__(self, valu, path=path) self.runt = runt self.locls.update(self.getObjLocals()) self.locls['iden'] = self.valu self.stors.update({ 'name': self._storUserName, 'email': self._methUserSetEmail, }) self.ctors.update({ 'json': self._ctorUserJson, 'vars': self._ctorUserVars, 'profile': self._ctorUserProfile, }) def __hash__(self): return hash((self._storm_typename, self.locls['iden'])) def _ctorUserJson(self, path=None): return UserJson(self.runt, self.valu) def _ctorUserProfile(self, path=None): return UserProfile(self.runt, self.valu) def _ctorUserVars(self, path=None): if self.runt.user.iden != self.valu and not self.runt.isAdmin(): mesg = '$user.vars requires admin privs when $user is not the current user.' raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name) return UserVars(self.runt, self.valu)
[docs] def getObjLocals(self): return { 'get': self._methUserGet, 'pack': self._methUserPack, 'tell': self._methUserTell, 'gates': self._methGates, 'notify': self._methUserNotify, 'roles': self._methUserRoles, 'allowed': self._methUserAllowed, 'grant': self._methUserGrant, 'revoke': self._methUserRevoke, 'addRule': self._methUserAddRule, 'delRule': self._methUserDelRule, 'popRule': self._methUserPopRule, 'setRoles': self._methUserSetRoles, 'getRules': self._methGetRules, 'setRules': self._methUserSetRules, 'setAdmin': self._methUserSetAdmin, 'setEmail': self._methUserSetEmail, 'setLocked': self._methUserSetLocked, 'setPasswd': self._methUserSetPasswd, 'setArchived': self._methUserSetArchived, 'getAllowedReason': self._methGetAllowedReason, 'genApiKey': self._methGenApiKey, 'getApiKey': self._methGetApiKey, 'listApiKeys': self._methListApiKeys, 'modApiKey': self._methModApiKey, 'delApiKey': self._methDelApiKey, }
@s_stormtypes.stormfunc(readonly=True) async def _methUserPack(self): return await self.value() async def _methUserTell(self, text): self.runt.confirm(('tell', self.valu), default=True) mesgdata = { 'text': await s_stormtypes.tostr(text), 'from': self.runt.user.iden, } return await self.runt.snap.core.addUserNotif(self.valu, 'tell', mesgdata) async def _methUserNotify(self, mesgtype, mesgdata): if not self.runt.isAdmin(): mesg = '$user.notify() method requires admin privs.' raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name) mesgtype = await s_stormtypes.tostr(mesgtype) mesgdata = await s_stormtypes.toprim(mesgdata) return await self.runt.snap.core.addUserNotif(self.valu, mesgtype, mesgdata) async def _storUserName(self, name): name = await s_stormtypes.tostr(name) if self.runt.user.iden == self.valu: self.runt.confirm(('auth', 'self', 'set', 'name'), default=True) await self.runt.snap.core.setUserName(self.valu, name) return self.runt.confirm(('auth', 'user', 'set', 'name')) await self.runt.snap.core.setUserName(self.valu, name) async def _derefGet(self, name): udef = await self.runt.snap.core.getUserDef(self.valu) return udef.get(name, s_common.novalu) async def _methUserGet(self, name): udef = await self.runt.snap.core.getUserDef(self.valu) return udef.get(name) @s_stormtypes.stormfunc(readonly=True) async def _methGates(self): user = self.runt.snap.core.auth.user(self.valu) retn = [] for gateiden in user.authgates.keys(): gate = await self.runt.snap.core.getAuthGate(gateiden) retn.append(Gate(self.runt, gate)) return retn @s_stormtypes.stormfunc(readonly=True) async def _methUserRoles(self): udef = await self.runt.snap.core.getUserDef(self.valu) return [Role(self.runt, rdef['iden']) for rdef in udef.get('roles')] @s_stormtypes.stormfunc(readonly=True) async def _methUserAllowed(self, permname, gateiden=None, default=False): permname = await s_stormtypes.tostr(permname) gateiden = await s_stormtypes.tostr(gateiden) default = await s_stormtypes.tobool(default) perm = tuple(permname.split('.')) user = await self.runt.snap.core.auth.reqUser(self.valu) return user.allowed(perm, gateiden=gateiden, default=default) @s_stormtypes.stormfunc(readonly=True) async def _methGetAllowedReason(self, permname, gateiden=None, default=False): permname = await s_stormtypes.tostr(permname) gateiden = await s_stormtypes.tostr(gateiden) default = await s_stormtypes.tobool(default) perm = tuple(permname.split('.')) user = await self.runt.snap.core.auth.reqUser(self.valu) reason = user.getAllowedReason(perm, gateiden=gateiden, default=default) return reason.value, reason.mesg async def _methUserGrant(self, iden, indx=None): self.runt.confirm(('auth', 'user', 'grant')) indx = await s_stormtypes.toint(indx, noneok=True) await self.runt.snap.core.addUserRole(self.valu, iden, indx=indx) async def _methUserSetRoles(self, idens): self.runt.confirm(('auth', 'user', 'grant')) self.runt.confirm(('auth', 'user', 'revoke')) idens = await s_stormtypes.toprim(idens) await self.runt.snap.core.setUserRoles(self.valu, idens) async def _methUserRevoke(self, iden): self.runt.confirm(('auth', 'user', 'revoke')) await self.runt.snap.core.delUserRole(self.valu, iden) async def _methUserSetRules(self, rules, gateiden=None): rules = await s_stormtypes.toprim(rules) gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'user', 'set', 'rules'), gateiden=gateiden) await self.runt.snap.core.setUserRules(self.valu, rules, gateiden=gateiden) @s_stormtypes.stormfunc(readonly=True) async def _methGetRules(self, gateiden=None): gateiden = await s_stormtypes.tostr(gateiden, noneok=True) user = self.runt.snap.core.auth.user(self.valu) return user.getRules(gateiden=gateiden) async def _methUserAddRule(self, rule, gateiden=None, indx=None): rule = await s_stormtypes.toprim(rule) indx = await s_stormtypes.toint(indx, noneok=True) gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'user', 'set', 'rules'), gateiden=gateiden) # TODO: Remove me in 3.0.0 if gateiden == 'cortex': mesg = f'Adding rule on the "cortex" authgate. This authgate is not used ' \ f'for permission checks and will be removed in Synapse v3.0.0.' await self.runt.snap.warn(mesg, log=False) await self.runt.snap.core.addUserRule(self.valu, rule, indx=indx, gateiden=gateiden) async def _methUserDelRule(self, rule, gateiden=None): rule = await s_stormtypes.toprim(rule) gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'user', 'set', 'rules'), gateiden=gateiden) await self.runt.snap.core.delUserRule(self.valu, rule, gateiden=gateiden) async def _methUserPopRule(self, indx, gateiden=None): gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'user', 'set', 'rules'), gateiden=gateiden) indx = await s_stormtypes.toint(indx) rules = list(await self._methGetRules(gateiden=gateiden)) if len(rules) <= indx: mesg = f'User {self.valu} only has {len(rules)} rules.' raise s_exc.BadArg(mesg=mesg) retn = rules.pop(indx) await self.runt.snap.core.setUserRules(self.valu, rules, gateiden=gateiden) return retn async def _methUserSetEmail(self, email): email = await s_stormtypes.tostr(email) if self.runt.user.iden == self.valu: self.runt.confirm(('auth', 'self', 'set', 'email'), default=True) await self.runt.snap.core.setUserEmail(self.valu, email) return self.runt.confirm(('auth', 'user', 'set', 'email')) await self.runt.snap.core.setUserEmail(self.valu, email) async def _methUserSetAdmin(self, admin, gateiden=None): gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'user', 'set', 'admin'), gateiden=gateiden) admin = await s_stormtypes.tobool(admin) await self.runt.snap.core.setUserAdmin(self.valu, admin, gateiden=gateiden) async def _methUserSetPasswd(self, passwd): passwd = await s_stormtypes.tostr(passwd, noneok=True) if self.runt.user.iden == self.valu: self.runt.confirm(('auth', 'self', 'set', 'passwd'), default=True) return await self.runt.snap.core.setUserPasswd(self.valu, passwd) self.runt.confirm(('auth', 'user', 'set', 'passwd')) return await self.runt.snap.core.setUserPasswd(self.valu, passwd) async def _methUserSetLocked(self, locked): self.runt.confirm(('auth', 'user', 'set', 'locked')) await self.runt.snap.core.setUserLocked(self.valu, await s_stormtypes.tobool(locked)) async def _methUserSetArchived(self, archived): self.runt.confirm(('auth', 'user', 'set', 'archived')) await self.runt.snap.core.setUserArchived(self.valu, await s_stormtypes.tobool(archived)) async def _methGenApiKey(self, name, duration=None): name = await s_stormtypes.tostr(name) duration = await s_stormtypes.toint(duration, noneok=True) if self.runt.user.iden == self.valu: self.runt.confirm(('auth', 'self', 'set', 'apikey'), default=True) return await self.runt.snap.core.addUserApiKey(self.valu, name, duration=duration) self.runt.confirm(('auth', 'user', 'set', 'apikey')) return await self.runt.snap.core.addUserApiKey(self.valu, name, duration=duration) @s_stormtypes.stormfunc(readonly=True) async def _methGetApiKey(self, iden): iden = await s_stormtypes.tostr(iden) if self.runt.user.iden == self.valu: self.runt.confirm(('auth', 'self', 'set', 'apikey'), default=True) valu = await self.runt.snap.core.getUserApiKey(iden) else: self.runt.confirm(('auth', 'user', 'set', 'apikey')) valu = await self.runt.snap.core.getUserApiKey(iden) valu.pop('shadow', None) return valu @s_stormtypes.stormfunc(readonly=True) async def _methListApiKeys(self): if self.runt.user.iden == self.valu: self.runt.confirm(('auth', 'self', 'set', 'apikey'), default=True) return await self.runt.snap.core.listUserApiKeys(self.valu) self.runt.confirm(('auth', 'user', 'set', 'apikey')) return await self.runt.snap.core.listUserApiKeys(self.valu) async def _methModApiKey(self, iden, name, valu): iden = await s_stormtypes.tostr(iden) name = await s_stormtypes.tostr(name) valu = await s_stormtypes.toprim(valu) if self.runt.user.iden == self.valu: self.runt.confirm(('auth', 'self', 'set', 'apikey'), default=True) return await self.runt.snap.core.modUserApiKey(iden, name, valu) self.runt.confirm(('auth', 'user', 'set', 'apikey')) return await self.runt.snap.core.modUserApiKey(iden, name, valu) async def _methDelApiKey(self, iden): iden = await s_stormtypes.tostr(iden) if self.runt.user.iden == self.valu: self.runt.confirm(('auth', 'self', 'set', 'apikey'), default=True) return await self.runt.snap.core.delUserApiKey(iden) self.runt.confirm(('auth', 'user', 'set', 'apikey')) return await self.runt.snap.core.delUserApiKey(iden)
[docs] async def value(self): return await self.runt.snap.core.getUserDef(self.valu)
[docs] async def stormrepr(self): return f'{self._storm_typename}: {await self.value()}'
[docs] @s_stormtypes.registry.registerType class Role(s_stormtypes.Prim): ''' Implements the Storm API for a Role. ''' _storm_locals = ( {'name': 'iden', 'desc': 'The Role iden.', 'type': 'str', }, {'name': 'get', 'desc': 'Get a arbitrary property from the Role definition.', 'type': {'type': 'function', '_funcname': '_methRoleGet', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the property to return.', }, ), 'returns': {'type': 'prim', 'desc': 'The requested value.', }}}, {'name': 'pack', 'desc': 'Get the packed version of the Role.', 'type': {'type': 'function', '_funcname': '_methRolePack', 'args': (), 'returns': {'type': 'dict', 'desc': 'The packed Role definition.', }}}, {'name': 'gates', 'desc': 'Return a list of auth gates that the role has rules for.', 'type': {'type': 'function', '_funcname': '_methGates', 'args': (), 'returns': {'type': 'list', 'desc': 'A list of ``auth:gates`` that the role has rules for.', }}}, {'name': 'addRule', 'desc': 'Add a rule to the Role', 'type': {'type': 'function', '_funcname': '_methRoleAddRule', 'args': ( {'name': 'rule', 'type': 'list', 'desc': 'The rule tuple to added to the Role.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rule.', 'default': None, }, {'name': 'indx', 'type': 'int', 'desc': 'The position of the rule as a 0 based index.', 'default': None, } ), 'returns': {'type': 'null', }}}, {'name': 'delRule', 'desc': 'Remove a rule from the Role.', 'type': {'type': 'function', '_funcname': '_methRoleDelRule', 'args': ( {'name': 'rule', 'type': 'list', 'desc': 'The rule tuple to removed from the Role.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rule.', 'default': None, }, ), 'returns': {'type': 'null', } }}, {'name': 'popRule', 'desc': 'Remove a rule by index from the Role.', 'type': {'type': 'function', '_funcname': '_methRolePopRule', 'args': ( {'name': 'indx', 'type': 'int', 'desc': 'The index of the rule to remove.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rule.', 'default': None, } ), 'returns': {'type': 'list', 'desc': 'The rule which was removed.'}}}, {'name': 'getRules', 'desc': 'Get the rules for the role and optional auth gate.', 'type': {'type': 'function', '_funcname': '_methGetRules', 'args': ( {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rules.', 'default': None}, ), 'returns': {'type': 'list', 'desc': 'A list of rules.'}}}, {'name': 'setRules', 'desc': 'Replace the rules on the Role with new rules.', 'type': {'type': 'function', '_funcname': '_methRoleSetRules', 'args': ( {'name': 'rules', 'type': 'list', 'desc': 'A list of rules to set on the Role.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The gate iden used for the rules.', 'default': None, }, ), 'returns': {'type': 'null', }}}, {'name': 'name', 'desc': ''' A role's name. This can also be used to set the role name. Example: Change a role's name:: $role=$lib.auth.roles.byname(analyst) $role.name=superheroes ''', 'type': {'type': 'stor', '_storfunc': '_setRoleName', 'returns': {'type': 'str', }}}, ) _storm_typename = 'auth:role' _ismutable = False def __init__(self, runt, valu, path=None): s_stormtypes.Prim.__init__(self, valu, path=path) self.runt = runt self.locls.update(self.getObjLocals()) self.locls['iden'] = self.valu self.stors.update({ 'name': self._setRoleName, }) def __hash__(self): return hash((self._storm_typename, self.locls['iden']))
[docs] def getObjLocals(self): return { 'get': self._methRoleGet, 'pack': self._methRolePack, 'gates': self._methGates, 'addRule': self._methRoleAddRule, 'delRule': self._methRoleDelRule, 'popRule': self._methRolePopRule, 'setRules': self._methRoleSetRules, 'getRules': self._methGetRules, }
async def _derefGet(self, name): rdef = await self.runt.snap.core.getRoleDef(self.valu) return rdef.get(name, s_common.novalu) async def _setRoleName(self, name): self.runt.confirm(('auth', 'role', 'set', 'name')) name = await s_stormtypes.tostr(name) await self.runt.snap.core.setRoleName(self.valu, name) @s_stormtypes.stormfunc(readonly=True) async def _methRoleGet(self, name): rdef = await self.runt.snap.core.getRoleDef(self.valu) return rdef.get(name) @s_stormtypes.stormfunc(readonly=True) async def _methRolePack(self): return await self.value() @s_stormtypes.stormfunc(readonly=True) async def _methGates(self): role = self.runt.snap.core.auth.role(self.valu) retn = [] for gateiden in role.authgates.keys(): gate = await self.runt.snap.core.getAuthGate(gateiden) retn.append(Gate(self.runt, gate)) return retn @s_stormtypes.stormfunc(readonly=True) async def _methGetRules(self, gateiden=None): gateiden = await s_stormtypes.tostr(gateiden, noneok=True) role = self.runt.snap.core.auth.role(self.valu) return role.getRules(gateiden=gateiden) async def _methRoleSetRules(self, rules, gateiden=None): rules = await s_stormtypes.toprim(rules) gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'role', 'set', 'rules'), gateiden=gateiden) await self.runt.snap.core.setRoleRules(self.valu, rules, gateiden=gateiden) async def _methRoleAddRule(self, rule, gateiden=None, indx=None): rule = await s_stormtypes.toprim(rule) indx = await s_stormtypes.toint(indx, noneok=True) gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'role', 'set', 'rules'), gateiden=gateiden) # TODO: Remove me in 3.0.0 if gateiden == 'cortex': mesg = f'Adding rule on the "cortex" authgate. This authgate is not used ' \ f'for permission checks and will be removed in Synapse v3.0.0.' await self.runt.snap.warn(mesg, log=False) await self.runt.snap.core.addRoleRule(self.valu, rule, indx=indx, gateiden=gateiden) async def _methRoleDelRule(self, rule, gateiden=None): rule = await s_stormtypes.toprim(rule) gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'role', 'set', 'rules'), gateiden=gateiden) await self.runt.snap.core.delRoleRule(self.valu, rule, gateiden=gateiden) async def _methRolePopRule(self, indx, gateiden=None): gateiden = await s_stormtypes.tostr(gateiden, noneok=True) self.runt.confirm(('auth', 'role', 'set', 'rules'), gateiden=gateiden) indx = await s_stormtypes.toint(indx) rules = list(await self._methGetRules(gateiden=gateiden)) if len(rules) <= indx: mesg = f'Role {self.valu} only has {len(rules)} rules.' raise s_exc.BadArg(mesg=mesg) retn = rules.pop(indx) await self.runt.snap.core.setRoleRules(self.valu, rules, gateiden=gateiden) return retn
[docs] async def value(self): return await self.runt.snap.core.getRoleDef(self.valu)
[docs] async def stormrepr(self): return f'{self._storm_typename}: {await self.value()}'
[docs] @s_stormtypes.registry.registerLib class LibAuth(s_stormtypes.Lib): ''' A Storm Library for interacting with Auth in the Cortex. ''' _storm_locals = ( {'name': 'ruleFromText', 'desc': 'Get a rule tuple from a text string.', 'type': {'type': 'function', '_funcname': 'ruleFromText', 'args': ( {'name': 'text', 'type': 'str', 'desc': 'The string to process.', }, ), 'returns': {'type': 'list', 'desc': 'A tuple containing a bool and a list of permission parts.', }}}, {'name': 'textFromRule', 'desc': 'Return a text string from a rule tuple.', 'type': {'type': 'function', '_funcname': 'textFromRule', 'args': ( {'name': 'rule', 'type': 'list', 'desc': 'A rule tuple.'}, ), 'returns': {'type': 'str', 'desc': 'The rule text.'}}}, {'name': 'getPermDefs', 'desc': 'Return a list of permission definitions.', 'type': {'type': 'function', '_funcname': 'getPermDefs', 'args': (), 'returns': {'type': 'list', 'desc': 'The list of permission definitions.'}}}, {'name': 'getPermDef', 'desc': 'Return a single permission definition.', 'type': {'type': 'function', '_funcname': 'getPermDef', 'args': ( {'name': 'perm', 'type': 'list', 'desc': 'A permission tuple.'}, ), 'returns': {'type': 'dict', 'desc': 'A permission definition or null.'}}}, ) _storm_lib_path = ('auth',)
[docs] def getObjLocals(self): return { 'getPermDef': self.getPermDef, 'getPermDefs': self.getPermDefs, 'ruleFromText': self.ruleFromText, 'textFromRule': self.textFromRule, }
[docs] @staticmethod @s_stormtypes.stormfunc(readonly=True) def ruleFromText(text): return ruleFromText(text)
[docs] @s_stormtypes.stormfunc(readonly=True) async def textFromRule(self, rule): rule = await s_stormtypes.toprim(rule) return s_common.reprauthrule(rule)
[docs] @s_stormtypes.stormfunc(readonly=True) async def getPermDefs(self): return self.runt.snap.core.getPermDefs()
[docs] @s_stormtypes.stormfunc(readonly=True) async def getPermDef(self, perm): perm = await s_stormtypes.toprim(perm) return self.runt.snap.core.getPermDef(perm)
[docs] @s_stormtypes.registry.registerType class StormUserVarsDict(s_stormtypes.Prim): ''' A Storm Primitive that maps the HiveDict interface to a user vars dictionary. ''' _storm_locals = ( {'name': 'get', 'desc': 'Get the value for a user var.', 'type': {'type': 'function', '_funcname': '_get', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the var.', }, {'name': 'default', 'type': 'prim', 'default': None, 'desc': 'The default value to return if not set.', }, ), 'returns': {'type': 'prim', 'desc': 'The requested value.', }}}, {'name': 'pop', 'desc': 'Remove a user var value.', 'type': {'type': 'function', '_funcname': '_pop', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the var.', }, {'name': 'default', 'type': 'prim', 'default': None, 'desc': 'The default value to return if not set.', }, ), 'returns': {'type': 'prim', 'desc': 'The requested value.', }}}, {'name': 'set', 'desc': 'Set a user var value.', 'type': {'type': 'function', '_funcname': '_set', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the var to set.', }, {'name': 'valu', 'type': 'prim', 'desc': 'The value to store.', }, ), 'returns': {'type': ['null', 'prim'], 'desc': 'Old value of the var if it was previously set, or none.', }}}, {'name': 'list', 'desc': 'List the vars and their values.', 'type': {'type': 'function', '_funcname': '_list', 'returns': {'type': 'list', 'desc': 'A list of tuples containing var, value pairs.', }}}, ) _storm_typename = 'user:vars:dict' _ismutable = True def __init__(self, runt, valu, path=None): s_stormtypes.Prim.__init__(self, valu, path=path) self.runt = runt self.locls.update(self.getObjLocals())
[docs] def getObjLocals(self): return { 'get': self._get, 'pop': self._pop, 'set': self._set, 'list': self._list, }
@s_stormtypes.stormfunc(readonly=True) async def _get(self, name, default=None): name = await s_stormtypes.tostr(name) return await self.runt.snap.core.getUserVarValu(self.valu, name, default=default) async def _pop(self, name, default=None): name = await s_stormtypes.tostr(name) return await self.runt.snap.core.popUserVarValu(self.valu, name, default=default) async def _set(self, name, valu): if not isinstance(name, str): mesg = 'The name of a variable must be a string.' raise s_exc.StormRuntimeError(mesg=mesg, name=name) name = await s_stormtypes.tostr(name) oldv = await self.runt.snap.core.getUserVarValu(self.valu, name) valu = await s_stormtypes.toprim(valu) await self.runt.snap.core.setUserVarValu(self.valu, name, valu) return oldv @s_stormtypes.stormfunc(readonly=True) async def _list(self): valu = await self.value() return list(valu.items())
[docs] async def iter(self): async for name, valu in self.runt.snap.core.iterUserVars(self.valu): yield name, valu await asyncio.sleep(0)
[docs] async def value(self): varz = {} async for key, valu in self.runt.snap.core.iterUserVars(self.valu): varz[key] = valu await asyncio.sleep(0) return varz
[docs] @s_stormtypes.registry.registerType class StormUserProfileDict(s_stormtypes.Prim): ''' A Storm Primitive that maps the HiveDict interface to a user profile dictionary. ''' _storm_locals = ( {'name': 'get', 'desc': 'Get a user profile value.', 'type': {'type': 'function', '_funcname': '_get', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the user profile value.', }, {'name': 'default', 'type': 'prim', 'default': None, 'desc': 'The default value to return if not set.', }, ), 'returns': {'type': 'prim', 'desc': 'The requested value.', }}}, {'name': 'pop', 'desc': 'Remove a user profile value.', 'type': {'type': 'function', '_funcname': '_pop', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the user profile value.', }, {'name': 'default', 'type': 'prim', 'default': None, 'desc': 'The default value to return if not set.', }, ), 'returns': {'type': 'prim', 'desc': 'The requested value.', }}}, {'name': 'set', 'desc': 'Set a user profile value.', 'type': {'type': 'function', '_funcname': '_set', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the user profile value to set.', }, {'name': 'valu', 'type': 'prim', 'desc': 'The value to store.', }, ), 'returns': {'type': ['null', 'prim'], 'desc': 'Old value if it was previously set, or none.', }}}, {'name': 'list', 'desc': 'List the user profile vars and their values.', 'type': {'type': 'function', '_funcname': '_list', 'returns': {'type': 'list', 'desc': 'A list of tuples containing var, value pairs.', }}}, ) _storm_typename = 'user:profile:dict' _ismutable = True def __init__(self, runt, valu, path=None): s_stormtypes.Prim.__init__(self, valu, path=path) self.runt = runt self.locls.update(self.getObjLocals())
[docs] def getObjLocals(self): return { 'get': self._get, 'pop': self._pop, 'set': self._set, 'list': self._list, }
@s_stormtypes.stormfunc(readonly=True) async def _get(self, name, default=None): name = await s_stormtypes.tostr(name) return await self.runt.snap.core.getUserProfInfo(self.valu, name, default=default) async def _pop(self, name, default=None): name = await s_stormtypes.tostr(name) return await self.runt.snap.core.popUserProfInfo(self.valu, name, default=default) async def _set(self, name, valu): if not isinstance(name, str): mesg = 'The name of a variable must be a string.' raise s_exc.StormRuntimeError(mesg=mesg, name=name) name = await s_stormtypes.tostr(name) oldv = await self.runt.snap.core.getUserProfInfo(self.valu, name) valu = await s_stormtypes.toprim(valu) await self.runt.snap.core.setUserProfInfo(self.valu, name, valu) return oldv @s_stormtypes.stormfunc(readonly=True) async def _list(self): valu = await self.value() return list(valu.items())
[docs] async def iter(self): async for name, valu in self.runt.snap.core.iterUserProfInfo(self.valu): yield name, valu await asyncio.sleep(0)
[docs] async def value(self): return await self.runt.snap.core.getUserProfile(self.valu)
[docs] @s_stormtypes.registry.registerLib class LibUser(s_stormtypes.Lib): ''' A Storm Library for interacting with data about the current user. ''' _storm_locals = ( {'name': 'name', 'desc': 'Get the name of the current runtime user.', 'type': {'type': 'function', '_funcname': '_libUserName', 'returns': {'type': 'str', 'desc': 'The username.', }}}, {'name': 'allowed', 'desc': 'Check if the current user has a given permission.', 'type': {'type': 'function', '_funcname': '_libUserAllowed', 'args': ( {'name': 'permname', 'type': 'str', 'desc': 'The permission string to check.', }, {'name': 'gateiden', 'type': 'str', 'desc': 'The authgate iden.', 'default': None, }, {'name': 'default', 'type': 'boolean', 'desc': 'The default value.', 'default': False, }, ), 'returns': {'type': 'boolean', 'desc': 'True if the user has the requested permission, false otherwise.', }}}, {'name': 'vars', 'desc': "Get a dictionary representing the current user's persistent variables.", 'type': 'auth:user:vars', }, {'name': 'profile', 'desc': "Get a dictionary representing the current user's profile information.", 'type': 'auth:user:profile', }, {'name': 'iden', 'desc': 'The user GUID for the current storm user.', 'type': 'str'}, ) _storm_lib_path = ('user', )
[docs] def getObjLocals(self): return { 'name': self._libUserName, 'iden': self.runt.user.iden, 'allowed': self._libUserAllowed, }
[docs] def addLibFuncs(self): super().addLibFuncs() self.locls.update({ 'vars': StormUserVarsDict(self.runt, self.runt.user.iden), 'json': UserJson(self.runt, self.runt.user.iden), 'profile': StormUserProfileDict(self.runt, self.runt.user.iden), })
@s_stormtypes.stormfunc(readonly=True) async def _libUserName(self): return self.runt.user.name @s_stormtypes.stormfunc(readonly=True) async def _libUserAllowed(self, permname, gateiden=None, default=False): permname = await s_stormtypes.toprim(permname) gateiden = await s_stormtypes.tostr(gateiden, noneok=True) default = await s_stormtypes.tobool(default) perm = permname.split('.') return self.runt.user.allowed(perm, gateiden=gateiden, default=default)
[docs] @s_stormtypes.registry.registerLib class LibUsers(s_stormtypes.Lib): ''' A Storm Library for interacting with Auth Users in the Cortex. ''' _storm_locals = ( {'name': 'add', 'desc': 'Add a User to the Cortex.', 'type': {'type': 'function', '_funcname': '_methUsersAdd', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the user.', }, {'name': 'passwd', 'type': 'str', 'desc': "The user's password.", 'default': None, }, {'name': 'email', 'type': 'str', 'desc': "The user's email address.", 'default': None, }, {'name': 'iden', 'type': 'str', 'desc': 'The iden to use to create the user.', 'default': None, } ), 'returns': {'type': 'auth:user', 'desc': 'The ``auth:user`` object for the new user.', }}}, {'name': 'del', 'desc': 'Delete a User from the Cortex.', 'type': {'type': 'function', '_funcname': '_methUsersDel', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the user to delete.', }, ), 'returns': {'type': 'null', }}}, {'name': 'list', 'desc': 'Get a list of Users in the Cortex.', 'type': {'type': 'function', '_funcname': '_methUsersList', 'returns': {'type': 'list', 'desc': 'A list of ``auth:user`` objects.', }}}, {'name': 'get', 'desc': 'Get a specific User by iden.', 'type': {'type': 'function', '_funcname': '_methUsersGet', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the user to retrieve.', }, ), 'returns': {'type': ['null', 'auth:user'], 'desc': 'The ``auth:user`` object, or none if the user does not exist.', }}}, {'name': 'byname', 'desc': 'Get a specific user by name.', 'type': {'type': 'function', '_funcname': '_methUsersByName', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the user to retrieve.', }, ), 'returns': {'type': ['null', 'auth:user'], 'desc': 'The ``auth:user`` object, or none if the user does not exist.', }}}, ) _storm_lib_path = ('auth', 'users') _storm_lib_perms = ( {'perm': ('auth', 'role', 'set', 'name'), 'gate': 'cortex', 'desc': 'Permits a user to change the name of a role.'}, {'perm': ('auth', 'role', 'set', 'rules'), 'gate': 'cortex', 'desc': 'Permits a user to modify rules of a role.'}, {'perm': ('auth', 'self', 'set', 'email'), 'gate': 'cortex', 'desc': 'Permits a user to change their own email address.', 'default': True}, {'perm': ('auth', 'self', 'set', 'name'), 'gate': 'cortex', 'desc': 'Permits a user to change their own username.', 'default': True}, {'perm': ('auth', 'self', 'set', 'passwd'), 'gate': 'cortex', 'desc': 'Permits a user to change their own password.', 'default': True}, {'perm': ('auth', 'self', 'set', 'apikey'), 'gate': 'cortex', 'desc': 'Permits a user to manage their API keys.', 'default': True}, {'perm': ('auth', 'user', 'grant'), 'gate': 'cortex', 'desc': 'Controls granting roles to a user.'}, {'perm': ('auth', 'user', 'revoke'), 'gate': 'cortex', 'desc': 'Controls revoking roles from a user.'}, {'perm': ('auth', 'user', 'set', 'admin'), 'gate': 'cortex', 'desc': 'Controls setting/removing a user\'s admin status.'}, {'perm': ('auth', 'user', 'set', 'email'), 'gate': 'cortex', 'desc': 'Controls changing a user\'s email address.'}, {'perm': ('auth', 'user', 'set', 'locked'), 'gate': 'cortex', 'desc': 'Controls locking/unlocking a user account.'}, {'perm': ('auth', 'user', 'set', 'archived'), 'gate': 'cortex', 'desc': 'Controls archiving/unarchiving a user account.'}, {'perm': ('auth', 'user', 'set', 'passwd'), 'gate': 'cortex', 'desc': 'Controls changing a user password.'}, {'perm': ('auth', 'user', 'set', 'rules'), 'gate': 'cortex', 'desc': 'Controls adding rules to a user.'}, {'perm': ('auth', 'user', 'get', 'profile', '<name>'), 'gate': 'cortex', 'desc': 'Permits a user to retrieve their profile information.', 'ex': 'auth.user.get.profile.fullname'}, {'perm': ('auth', 'user', 'pop', 'profile', '<name>'), 'gate': 'cortex', 'desc': 'Permits a user to remove profile information.', 'ex': 'auth.user.pop.profile.fullname'}, {'perm': ('auth', 'user', 'set', 'profile', '<name>'), 'gate': 'cortex', 'desc': 'Permits a user to set profile information.', 'ex': 'auth.user.set.profile.fullname'}, {'perm': ('auth', 'user', 'set', 'apikey'), 'gate': 'cortex', 'desc': 'Permits a user to manage API keys for other users. USE WITH CAUTUON!'}, {'perm': ('storm', 'lib', 'auth', 'users', 'add'), 'gate': 'cortex', 'desc': 'Controls the ability to add a user to the system. USE WITH CAUTION!'}, {'perm': ('storm', 'lib', 'auth', 'users', 'del'), 'gate': 'cortex', 'desc': 'Controls the ability to remove a user from the system. USE WITH CAUTION!'}, )
[docs] def getObjLocals(self): return { 'add': self._methUsersAdd, 'del': self._methUsersDel, 'list': self._methUsersList, 'get': self._methUsersGet, 'byname': self._methUsersByName, }
@s_stormtypes.stormfunc(readonly=True) async def _methUsersList(self): return [User(self.runt, udef['iden']) for udef in await self.runt.snap.core.getUserDefs()] @s_stormtypes.stormfunc(readonly=True) async def _methUsersGet(self, iden): udef = await self.runt.snap.core.getUserDef(iden) if udef is not None: return User(self.runt, udef['iden']) @s_stormtypes.stormfunc(readonly=True) async def _methUsersByName(self, name): udef = await self.runt.snap.core.getUserDefByName(name) if udef is not None: return User(self.runt, udef['iden']) async def _methUsersAdd(self, name, passwd=None, email=None, iden=None): if not self.runt.allowed(('auth', 'user', 'add')): self.runt.confirm(('storm', 'lib', 'auth', 'users', 'add')) name = await s_stormtypes.tostr(name) iden = await s_stormtypes.tostr(iden, True) email = await s_stormtypes.tostr(email, True) passwd = await s_stormtypes.tostr(passwd, True) udef = await self.runt.snap.core.addUser(name, passwd=passwd, email=email, iden=iden,) return User(self.runt, udef['iden']) async def _methUsersDel(self, iden): if not self.runt.allowed(('auth', 'user', 'del')): self.runt.confirm(('storm', 'lib', 'auth', 'users', 'del')) await self.runt.snap.core.delUser(iden)
[docs] @s_stormtypes.registry.registerLib class LibRoles(s_stormtypes.Lib): ''' A Storm Library for interacting with Auth Roles in the Cortex. ''' _storm_locals = ( {'name': 'add', 'desc': 'Add a Role to the Cortex.', 'type': {'type': 'function', '_funcname': '_methRolesAdd', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the role.', }, {'name': 'iden', 'type': 'str', 'desc': 'The iden to assign to the new role.', 'default': None}, ), 'returns': {'type': 'auth:role', 'desc': 'The new role object.', }}}, {'name': 'del', 'desc': 'Delete a Role from the Cortex.', 'type': {'type': 'function', '_funcname': '_methRolesDel', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the role to delete.', }, ), 'returns': {'type': 'null', }}}, {'name': 'list', 'desc': 'Get a list of Roles in the Cortex.', 'type': {'type': 'function', '_funcname': '_methRolesList', 'returns': {'type': 'list', 'desc': 'A list of ``auth:role`` objects.', }}}, {'name': 'get', 'desc': 'Get a specific Role by iden.', 'type': {'type': 'function', '_funcname': '_methRolesGet', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the role to retrieve.', }, ), 'returns': {'type': ['null', 'auth:role'], 'desc': 'The ``auth:role`` object; or null if the role does not exist.', }}}, {'name': 'byname', 'desc': 'Get a specific Role by name.', 'type': {'type': 'function', '_funcname': '_methRolesByName', 'args': ( {'name': 'name', 'type': 'str', 'desc': 'The name of the role to retrieve.', }, ), 'returns': {'type': ['null', 'auth:role'], 'desc': 'The role by name, or null if it does not exist.', }}}, ) _storm_lib_path = ('auth', 'roles') _storm_lib_perms = ( {'perm': ('storm', 'lib', 'auth', 'roles', 'add'), 'gate': 'cortex', 'desc': 'Controls the ability to add a role to the system. USE WITH CAUTION!'}, {'perm': ('storm', 'lib', 'auth', 'roles', 'del'), 'gate': 'cortex', 'desc': 'Controls the ability to remove a role from the system. USE WITH CAUTION!'}, )
[docs] def getObjLocals(self): return { 'add': self._methRolesAdd, 'del': self._methRolesDel, 'list': self._methRolesList, 'get': self._methRolesGet, 'byname': self._methRolesByName, }
@s_stormtypes.stormfunc(readonly=True) async def _methRolesList(self): return [Role(self.runt, rdef['iden']) for rdef in await self.runt.snap.core.getRoleDefs()] @s_stormtypes.stormfunc(readonly=True) async def _methRolesGet(self, iden): rdef = await self.runt.snap.core.getRoleDef(iden) if rdef is not None: return Role(self.runt, rdef['iden']) @s_stormtypes.stormfunc(readonly=True) async def _methRolesByName(self, name): rdef = await self.runt.snap.core.getRoleDefByName(name) if rdef is not None: return Role(self.runt, rdef['iden']) async def _methRolesAdd(self, name, iden=None): if not self.runt.allowed(('auth', 'role', 'add')): self.runt.confirm(('storm', 'lib', 'auth', 'roles', 'add')) iden = await s_stormtypes.tostr(iden, noneok=True) rdef = await self.runt.snap.core.addRole(name, iden=iden) return Role(self.runt, rdef['iden']) async def _methRolesDel(self, iden): if not self.runt.allowed(('auth', 'role', 'del')): self.runt.confirm(('storm', 'lib', 'auth', 'roles', 'del')) await self.runt.snap.core.delRole(iden)
[docs] @s_stormtypes.registry.registerLib class LibGates(s_stormtypes.Lib): ''' A Storm Library for interacting with Auth Gates in the Cortex. ''' _storm_locals = ( {'name': 'get', 'desc': 'Get a specific Gate by iden.', 'type': {'type': 'function', '_funcname': '_methGatesGet', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The iden of the gate to retrieve.', }, ), 'returns': {'type': ['null', 'auth:gate'], 'desc': 'The ``auth:gate`` if it exists, otherwise null.', }}}, {'name': 'list', 'desc': 'Get a list of Gates in the Cortex.', 'type': {'type': 'function', '_funcname': '_methGatesList', 'returns': {'type': 'list', 'desc': 'A list of ``auth:gate`` objects.', }}}, ) _storm_lib_path = ('auth', 'gates')
[docs] def getObjLocals(self): return { 'get': self._methGatesGet, 'list': self._methGatesList, }
@s_stormtypes.stormfunc(readonly=True) async def _methGatesList(self): todo = s_common.todo('getAuthGates') gates = await self.runt.coreDynCall(todo) return [Gate(self.runt, g) for g in gates] @s_stormtypes.stormfunc(readonly=True) async def _methGatesGet(self, iden): iden = await s_stormtypes.toprim(iden) todo = s_common.todo('getAuthGate', iden) gate = await self.runt.coreDynCall(todo) if gate: return Gate(self.runt, gate)
[docs] @s_stormtypes.registry.registerType class Gate(s_stormtypes.Prim): ''' Implements the Storm API for an AuthGate. ''' _storm_locals = ( {'name': 'iden', 'desc': 'The iden of the AuthGate.', 'type': 'str', }, {'name': 'type', 'desc': 'The type of the AuthGate.', 'type': 'str', }, {'name': 'roles', 'desc': 'The role idens which are a member of the Authgate.', 'type': 'list', }, {'name': 'users', 'desc': 'The user idens which are a member of the Authgate.', 'type': 'list', }, ) _storm_typename = 'auth:gate' _ismutable = False def __init__(self, runt, valu, path=None): s_stormtypes.Prim.__init__(self, valu, path=path) self.runt = runt self.locls.update({ 'iden': self.valu.get('iden'), 'type': self.valu.get('type'), 'roles': self.valu.get('roles', ()), 'users': self.valu.get('users', ()), }) def __hash__(self): return hash((self._storm_typename, self.locls['iden']))