Source code for synapse.lib.stormlib.oauth

import yarl
from oauthlib import oauth1

import synapse.exc as s_exc

import synapse.lib.stormtypes as s_stormtypes

[docs]@s_stormtypes.registry.registerLib class OAuthV1Lib(s_stormtypes.Lib): ''' A Storm library to handle OAuth v1 authentication. ''' _storm_locals = ( { 'name': 'client', 'desc': ''' Initialize an OAuthV1 Client to use for signing/authentication. ''', 'type': { 'type': 'function', '_funcname': '_methClient', 'args': ( {'name': 'ckey', 'type': 'str', 'desc': 'The OAuthV1 Consumer Key to store and use for signing requests.'}, {'name': 'csecret', 'type': 'str', 'desc': 'The OAuthV1 Consumer Secret used to sign requests.'}, {'name': 'atoken', 'type': 'str', 'desc': 'The OAuthV1 Access Token (or resource owner key) to use to sign requests.)'}, {'name': 'asecret', 'type': 'str', 'desc': 'The OAuthV1 Access Token Secret (or resource owner secret) to use to sign requests.'}, {'name': 'sigtype', 'type': 'str', 'default': oauth1.SIGNATURE_TYPE_QUERY, 'desc': 'Where to populate the signature (in the HTTP body, in the query parameters, or in the header)'}, ), 'returns': { 'type': 'inet:http:oauth:v1:client', 'desc': 'An OAuthV1 client to be used to sign requests.', } }, }, ) _storm_lib_path = ('inet', 'http', 'oauth', 'v1',)
[docs] def getObjLocals(self): return { 'client': self._methClient, 'SIG_BODY': oauth1.SIGNATURE_TYPE_BODY, 'SIG_QUERY': oauth1.SIGNATURE_TYPE_QUERY, 'SIG_HEADER': oauth1.SIGNATURE_TYPE_AUTH_HEADER, }
async def _methClient(self, ckey, csecret, atoken, asecret, sigtype=oauth1.SIGNATURE_TYPE_QUERY): return OAuthV1Client(self.runt, ckey, csecret, atoken, asecret, sigtype)
[docs]@s_stormtypes.registry.registerType class OAuthV1Client(s_stormtypes.StormType): ''' A client for doing OAuth V1 Authentication from Storm. ''' _storm_locals = ( { 'name': 'sign', 'desc': ''' Sign an OAuth request to a particular URL. ''', 'type': { 'type': 'function', '_funcname': '_methSign', 'args': ( {'name': 'baseurl', 'type': 'str', 'desc': 'The base url to sign and query.'}, {'name': 'method', 'type': 'dict', 'default': 'GET', 'desc': 'The HTTP Method to use as part of signing.'}, {'name': 'headers', 'type': 'dict', 'default': None, 'desc': 'Optional headers used for signing. Can override the "Content-Type" header if the signature type is set to SIG_BODY'}, {'name': 'params', 'type': 'dict', 'default': None, 'desc': 'Optional query parameters to pass to url construction and/or signing.'}, {'name': 'body', 'type': 'bytes', 'default': None, 'desc': 'Optional HTTP body to pass to request signing.'}, ), 'returns': { 'type': 'list', 'desc': 'A 3-element tuple of ($url, $headers, $body). The OAuth signature elements will be embedded in the element specified when constructing the client.' }, }, }, ) _storm_typename = 'inet:http:oauth:v1:client' def __init__(self, runt, ckey, csecret, atoken, asecret, sigtype, path=None): s_stormtypes.StormType.__init__(self, path=path) self.runt = runt self.locls.update(self.getObjLocals()) self.sigtype = sigtype self.client = oauth1.Client( ckey, client_secret=csecret, resource_owner_key=atoken, resource_owner_secret=asecret, signature_type=sigtype )
[docs] def getObjLocals(self): return { 'sign': self._methSign, }
async def _methSign(self, baseurl, method='GET', headers=None, params=None, body=None): url = yarl.URL(baseurl).with_query(await s_stormtypes.toprim(params)) headers = await s_stormtypes.toprim(headers) body = await s_stormtypes.toprim(body) if self.sigtype == oauth1.SIGNATURE_TYPE_BODY: if not headers: headers = {'Content-Type': oauth1.rfc5849.CONTENT_TYPE_FORM_URLENCODED} else: headers['Content-Type'] = oauth1.rfc5849.CONTENT_TYPE_FORM_URLENCODED try: return self.client.sign(str(url), http_method=method, headers=headers, body=body) except ValueError as e: mesg = f'Request signing failed ({str(e)})' raise s_exc.StormRuntimeError(mesg=mesg) from None
[docs]@s_stormtypes.registry.registerLib class OAuthV2Lib(s_stormtypes.Lib): ''' A Storm library for managing OAuth V2 clients. ''' _storm_lib_path = ('inet', 'http', 'oauth', 'v2') _storm_locals = ( { 'name': 'addProvider', 'desc': ''' Add a new provider configuration. Example: Add a new provider which uses the authorization code flow:: $iden = $lib.guid(example, provider, oauth) $conf = ({ "iden": $iden, "name": "example_provider", "client_id": "yourclientid", "client_secret": "yourclientsecret", "scope": "first_scope second_scope", "auth_uri": "https://provider.com/auth", "token_uri": "https://provider.com/token", "redirect_uri": "https://local.redirect.com/oauth", }) // Optionally enable PKCE $conf.extensions = ({"pkce": $lib.true}) // Optionally disable SSL verification $conf.ssl_verify = $lib.false // Optionally provide additional key-val parameters // to include when calling the auth URI $conf.extra_auth_params = ({"customparam": "foo"}) $lib.inet.http.oauth.v2.addProvider($conf) ''', 'type': { 'type': 'function', '_funcname': '_addProvider', 'args': ( {'name': 'conf', 'type': 'dict', 'desc': 'A provider configuration.'}, ), 'returns': {'type': 'null'}, }, }, { 'name': 'delProvider', 'desc': 'Delete a provider configuration.', 'type': { 'type': 'function', '_funcname': '_delProvider', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The provider iden.'}, ), 'returns': {'type': 'dict', 'desc': 'The deleted provider configuration or None if it does not exist.'} }, }, { 'name': 'getProvider', 'desc': 'Get a provider configuration', 'type': { 'type': 'function', '_funcname': '_getProvider', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The provider iden.'}, ), 'returns': {'type': 'dict', 'desc': 'The provider configuration or None if it does not exist.'} }, }, { 'name': 'listProviders', 'desc': 'List provider configurations', 'type': { 'type': 'function', '_funcname': '_listProviders', 'returns': {'type': 'list', 'desc': 'List of (iden, conf) tuples.'}, }, }, { 'name': 'setUserAuthCode', 'desc': 'Set the auth code for the current user.', 'type': { 'type': 'function', '_funcname': '_setUserAuthCode', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The provider iden.'}, {'name': 'authcode', 'type': 'str', 'desc': 'The auth code for the user.'}, {'name': 'code_verifier', 'type': 'str', 'default': None, 'desc': 'Optional PKCE code verifier.'}, ), 'returns': {'type': 'null'} }, }, { 'name': 'getUserAccessToken', 'desc': ''' Get the provider access token for the current user. Example: Retrieve the token and handle needing an auth code:: $provideriden = $lib.globals.get("oauth:myprovider") ($ok, $data) = $lib.inet.http.oauth.v2.getUserAccessToken($provideriden) if $ok { // $data is the token to be used in a request else { // $data is a message stating why the token is not available // caller should now handle retrieving a new auth code for the user } ''', 'type': { 'type': 'function', '_funcname': '_getUserAccessToken', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The provider iden.'}, ), 'returns': {'type': 'list', 'desc': 'List of (<bool>, <token/mesg>) for status and data.'}, }, }, { 'name': 'clearUserAccessToken', 'desc': 'Clear the stored refresh data for the current user\'s provider access token.', 'type': { 'type': 'function', '_funcname': '_clearUserAccessToken', 'args': ( {'name': 'iden', 'type': 'str', 'desc': 'The provider iden.'}, ), 'returns': {'type': 'dict', 'desc': 'The existing token state data or None if it did not exist.'}, }, } )
[docs] def getObjLocals(self): return { 'addProvider': self._addProvider, 'delProvider': self._delProvider, 'getProvider': self._getProvider, 'listProviders': self._listProviders, 'setUserAuthCode': self._setUserAuthCode, 'getUserAccessToken': self._getUserAccessToken, 'clearUserAccessToken': self._clearUserAccessToken, }
async def _addProvider(self, conf): if not self.runt.isAdmin(): raise s_exc.AuthDeny(mesg='addProvider() requires admin privs.', user=self.runt.user.iden, username=self.runt.user.name) conf = await s_stormtypes.toprim(conf) await self.runt.snap.core.addOAuthProvider(conf) async def _delProvider(self, iden): if not self.runt.isAdmin(): raise s_exc.AuthDeny(mesg='delProvider() requires admin privs.', user=self.runt.user.iden, username=self.runt.user.name) iden = await s_stormtypes.tostr(iden) return await self.runt.snap.core.delOAuthProvider(iden) async def _getProvider(self, iden): if not self.runt.isAdmin(): raise s_exc.AuthDeny(mesg='getProvider() requires admin privs.', user=self.runt.user.iden, username=self.runt.user.name) iden = await s_stormtypes.tostr(iden) return await self.runt.snap.core.getOAuthProvider(iden) async def _listProviders(self): if not self.runt.isAdmin(): raise s_exc.AuthDeny(mesg='listProviders() requires admin privs.', user=self.runt.user.iden, username=self.runt.user.name) return await self.runt.snap.core.listOAuthProviders() async def _setUserAuthCode(self, iden, authcode, code_verifier=None): iden = await s_stormtypes.tostr(iden) authcode = await s_stormtypes.tostr(authcode) code_verifier = await s_stormtypes.tostr(code_verifier, True) useriden = self.runt.user.iden await self.runt.snap.core.setOAuthAuthCode(iden, useriden, authcode, code_verifier=code_verifier) async def _getUserAccessToken(self, iden): iden = await s_stormtypes.tostr(iden) return await self.runt.snap.core.getOAuthAccessToken(iden, self.runt.user.iden) async def _clearUserAccessToken(self, iden): iden = await s_stormtypes.tostr(iden) return await self.runt.snap.core.clearOAuthAccessToken(iden, self.runt.user.iden)