Source code for synapse.lib.stormhttp

import json
import asyncio
import logging
import urllib.parse

logger = logging.getLogger(__name__)

import aiohttp
import aiohttp_socks

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.base as s_base
import synapse.lib.msgpack as s_msgpack
import synapse.lib.version as s_version
import synapse.lib.stormtypes as s_stormtypes

[docs] @s_stormtypes.registry.registerType class WebSocket(s_base.Base, s_stormtypes.StormType): ''' Implements the Storm API for a Websocket. ''' _storm_typename = 'inet:http:socket' _storm_locals = ( {'name': 'tx', 'desc': 'Transmit a message over the web socket.', 'type': {'type': 'function', '_funcname': 'tx', 'args': ( {'name': 'mesg', 'type': 'dict', 'desc': 'A JSON compatible message.', }, ), 'returns': {'type': 'list', 'desc': 'An ($ok, $valu) tuple.'}}}, {'name': 'rx', 'desc': 'Receive a message from the web socket.', 'type': {'type': 'function', '_funcname': 'rx', 'args': ( {'name': 'timeout', 'type': 'int', 'desc': 'The timeout to wait for', 'default': None, }, ), 'returns': {'type': 'list', 'desc': 'An ($ok, $valu) tuple.'}}}, ) async def __anit__(self): await s_base.Base.__anit__(self) s_stormtypes.StormType.__init__(self) self.locls.update(self.getObjLocals())
[docs] def getObjLocals(self): return { 'tx': self.tx, 'rx': self.rx, }
[docs] async def tx(self, mesg): try: mesg = await s_stormtypes.toprim(mesg) await self.resp.send_bytes(json.dumps(mesg).encode()) return (True, None) except asyncio.CancelledError: # pragma: no cover raise except Exception as e: # pragma: no cover return s_common.retnexc(e)
[docs] async def rx(self, timeout=None): try: _type, data, extra = await s_common.wait_for(self.resp.receive(), timeout=timeout) if _type == aiohttp.WSMsgType.BINARY: return (True, json.loads(data)) if _type == aiohttp.WSMsgType.TEXT: return (True, json.loads(data.encode())) if _type == aiohttp.WSMsgType.CLOSED: # pragma: no cover return (True, None) return (False, ('BadMesgFormat', {'mesg': f'WebSocket RX unhandled type: {_type.name}'})) # pragma: no cover except asyncio.CancelledError: # pragma: no cover raise except Exception as e: # pragma: no cover return s_common.retnexc(e)
[docs] @s_stormtypes.registry.registerLib class LibHttp(s_stormtypes.Lib): ''' A Storm Library exposing an HTTP client API. For APIs that accept an ssl_opts argument, the dictionary may contain the following values:: { 'verify': <bool> - Perform SSL/TLS verification. Is overridden by the ssl_verify argument. 'client_cert': <str> - PEM encoded full chain certificate for use in mTLS. 'client_key': <str> - PEM encoded key for use in mTLS. Alternatively, can be included in client_cert. 'ca_cert': <str> - A PEM encoded full chain CA certificate for use when verifying the request. } ''' _storm_locals = ( {'name': 'get', 'desc': 'Get the contents of a given URL.', 'type': {'type': 'function', '_funcname': '_httpEasyGet', 'args': ( {'name': 'url', 'type': 'str', 'desc': 'The URL to retrieve.', }, {'name': 'headers', 'type': 'dict', 'desc': 'HTTP headers to send with the request.', 'default': None}, {'name': 'ssl_verify', 'type': 'boolean', 'desc': 'Perform SSL/TLS verification.', 'default': True}, {'name': 'params', 'type': 'dict', 'desc': 'Optional parameters which may be passed to the request.', 'default': None}, {'name': 'timeout', 'type': 'int', 'desc': 'Total timeout for the request in seconds.', 'default': 300}, {'name': 'allow_redirects', 'type': 'bool', 'desc': 'If set to false, do not follow redirects.', 'default': True}, {'name': 'proxy', 'type': ['bool', 'null', 'str'], 'desc': 'Set to a proxy URL string or $lib.false to disable proxy use.', 'default': None}, {'name': 'ssl_opts', 'type': 'dict', 'desc': 'Optional SSL/TLS options. See $lib.inet.http help for additional details.', 'default': None}, ), 'returns': {'type': 'inet:http:resp', 'desc': 'The response object.'}}}, {'name': 'post', 'desc': 'Post data to a given URL.', 'type': {'type': 'function', '_funcname': '_httpPost', 'args': ( {'name': 'url', 'type': 'str', 'desc': 'The URL to post to.', }, {'name': 'headers', 'type': 'dict', 'desc': 'HTTP headers to send with the request.', 'default': None}, {'name': 'json', 'type': 'prim', 'desc': 'The data to post, as JSON object.', 'default': None}, {'name': 'body', 'type': 'bytes', 'desc': 'The data to post, as binary object.', 'default': None}, {'name': 'ssl_verify', 'type': 'boolean', 'desc': 'Perform SSL/TLS verification.', 'default': True}, {'name': 'params', 'type': 'dict', 'desc': 'Optional parameters which may be passed to the request.', 'default': None}, {'name': 'timeout', 'type': 'int', 'desc': 'Total timeout for the request in seconds.', 'default': 300}, {'name': 'allow_redirects', 'type': 'bool', 'desc': 'If set to false, do not follow redirects.', 'default': True}, {'name': 'fields', 'type': 'list', 'desc': 'A list of info dictionaries containing the name, value or sha256, ' 'and additional parameters for fields to post, as multipart/form-data. ' 'If a sha256 is specified, the request will be sent from the axon ' 'and the corresponding file will be uploaded as the value for ' 'the field.', 'default': None}, {'name': 'proxy', 'type': ['bool', 'null', 'str'], 'desc': 'Set to a proxy URL string or $lib.false to disable proxy use.', 'default': None}, {'name': 'ssl_opts', 'type': 'dict', 'desc': 'Optional SSL/TLS options. See $lib.inet.http help for additional details.', 'default': None}, ), 'returns': {'type': 'inet:http:resp', 'desc': 'The response object.'}}}, {'name': 'head', 'desc': 'Get the HEAD response for a URL.', 'type': {'type': 'function', '_funcname': '_httpEasyHead', 'args': ( {'name': 'url', 'type': 'str', 'desc': 'The URL to retrieve.'}, {'name': 'headers', 'type': 'dict', 'desc': 'HTTP headers to send with the request.', 'default': None}, {'name': 'ssl_verify', 'type': 'boolean', 'desc': 'Perform SSL/TLS verification.', 'default': True}, {'name': 'params', 'type': 'dict', 'desc': 'Optional parameters which may be passed to the request.', 'default': None}, {'name': 'timeout', 'type': 'int', 'desc': 'Total timeout for the request in seconds.', 'default': 300, }, {'name': 'allow_redirects', 'type': 'bool', 'desc': 'If set to true, follow redirects.', 'default': False}, {'name': 'proxy', 'type': ['bool', 'null', 'str'], 'desc': 'Set to a proxy URL string or $lib.false to disable proxy use.', 'default': None}, {'name': 'ssl_opts', 'type': 'dict', 'desc': 'Optional SSL/TLS options. See $lib.inet.http help for additional details.', 'default': None}, ), 'returns': {'type': 'inet:http:resp', 'desc': 'The response object.'}}}, {'name': 'request', 'desc': 'Make an HTTP request using the given HTTP method to the url.', 'type': {'type': 'function', '_funcname': '_httpRequest', 'args': ( {'name': 'meth', 'type': 'str', 'desc': 'The HTTP method. (ex. PUT)'}, {'name': 'url', 'type': 'str', 'desc': 'The URL to send the request to.'}, {'name': 'headers', 'type': 'dict', 'desc': 'HTTP headers to send with the request.', 'default': None}, {'name': 'json', 'type': 'prim', 'desc': 'The data to include in the body, as JSON object.', 'default': None}, {'name': 'body', 'type': 'bytes', 'desc': 'The data to include in the body, as binary object.', 'default': None}, {'name': 'ssl_verify', 'type': 'boolean', 'desc': 'Perform SSL/TLS verification.', 'default': True}, {'name': 'params', 'type': 'dict', 'desc': 'Optional parameters which may be passed to the request.', 'default': None}, {'name': 'timeout', 'type': 'int', 'desc': 'Total timeout for the request in seconds.', 'default': 300}, {'name': 'allow_redirects', 'type': 'bool', 'desc': 'If set to false, do not follow redirects.', 'default': True}, {'name': 'fields', 'type': 'list', 'desc': 'A list of info dictionaries containing the name, value or sha256, ' 'and additional parameters for fields to post, as multipart/form-data. ' 'If a sha256 is specified, the request will be sent from the axon ' 'and the corresponding file will be uploaded as the value for ' 'the field.', 'default': None}, {'name': 'proxy', 'type': ['bool', 'null', 'str'], 'desc': 'Set to a proxy URL string or $lib.false to disable proxy use.', 'default': None}, {'name': 'ssl_opts', 'type': 'dict', 'desc': 'Optional SSL/TLS options. See $lib.inet.http help for additional details.', 'default': None}, ), 'returns': {'type': 'inet:http:resp', 'desc': 'The response object.'} } }, {'name': 'connect', 'desc': 'Connect a web socket to tx/rx JSON messages.', 'type': {'type': 'function', '_funcname': 'inetHttpConnect', 'args': ( {'name': 'url', 'type': 'str', 'desc': 'The URL to retrieve.'}, {'name': 'headers', 'type': 'dict', 'desc': 'HTTP headers to send with the request.', 'default': None}, {'name': 'ssl_verify', 'type': 'boolean', 'desc': 'Perform SSL/TLS verification.', 'default': True}, {'name': 'timeout', 'type': 'int', 'desc': 'Total timeout for the request in seconds.', 'default': 300}, {'name': 'params', 'type': 'dict', 'desc': 'Optional parameters which may be passed to the connection request.', 'default': None}, {'name': 'proxy', 'type': ['bool', 'null', 'str'], 'desc': 'Set to a proxy URL string or $lib.false to disable proxy use.', 'default': None}, {'name': 'ssl_opts', 'type': 'dict', 'desc': 'Optional SSL/TLS options. See $lib.inet.http help for additional details.', 'default': None}, ), 'returns': {'type': 'inet:http:socket', 'desc': 'A websocket object.'}}}, {'name': 'urlencode', 'desc': ''' Urlencode a text string. This will replace special characters in a string using the %xx escape and replace spaces with plus signs. Examples: Urlencode a string:: $str=$lib.inet.http.urlencode("http://google.com") ''', 'type': {'type': 'function', '_funcname': 'urlencode', 'args': ( {'name': 'text', 'type': 'str', 'desc': 'The text string.', }, ), 'returns': {'type': 'str', 'desc': 'The urlencoded string.', }}}, {'name': 'urldecode', 'desc': ''' Urldecode a text string. This will replace %xx escape characters with the special characters they represent and replace plus signs with spaces. Examples: Urlencode a string:: $str=$lib.inet.http.urldecode("http%3A%2F%2Fgo+ogle.com") ''', 'type': {'type': 'function', '_funcname': 'urldecode', 'args': ( {'name': 'text', 'type': 'str', 'desc': 'The text string.', }, ), 'returns': {'type': 'str', 'desc': 'The urldecoded string.', }}}, {'name': 'codereason', 'desc': ''' Get the reason phrase for an HTTP status code. Examples: Get the reason for a 404 status code:: $str=$lib.inet.http.codereason(404) ''', 'type': {'type': 'function', '_funcname': 'codereason', 'args': ( {'name': 'code', 'type': 'int', 'desc': 'The HTTP status code.', }, ), 'returns': {'type': 'str', 'desc': 'The reason phrase for the status code.', }}}, ) _storm_lib_path = ('inet', 'http') _storm_lib_perms = ( {'perm': ('storm', 'lib', 'inet', 'http', 'proxy'), 'gate': 'cortex', 'desc': 'Permits a user to specify the proxy used with `$lib.inet.http` APIs.'}, )
[docs] def getObjLocals(self): return { 'get': self._httpEasyGet, 'post': self._httpPost, 'head': self._httpEasyHead, 'request': self._httpRequest, 'connect': self.inetHttpConnect, 'urlencode': self.urlencode, 'urldecode': self.urldecode, 'codereason': self.codereason, }
[docs] def strify(self, item): if isinstance(item, (list, tuple)): return [(str(k), str(v)) for (k, v) in item] elif isinstance(item, dict): return {str(k): str(v) for k, v in item.items()} return item
[docs] @s_stormtypes.stormfunc(readonly=True) async def urlencode(self, text): text = await s_stormtypes.tostr(text) return urllib.parse.quote_plus(text)
[docs] @s_stormtypes.stormfunc(readonly=True) async def urldecode(self, text): text = await s_stormtypes.tostr(text) return urllib.parse.unquote_plus(text)
[docs] @s_stormtypes.stormfunc(readonly=True) async def codereason(self, code): code = await s_stormtypes.toint(code) return s_common.httpcodereason(code)
async def _httpEasyHead(self, url, headers=None, ssl_verify=True, params=None, timeout=300, allow_redirects=False, proxy=None, ssl_opts=None): return await self._httpRequest('HEAD', url, headers=headers, ssl_verify=ssl_verify, params=params, timeout=timeout, allow_redirects=allow_redirects, proxy=proxy, ssl_opts=ssl_opts) async def _httpEasyGet(self, url, headers=None, ssl_verify=True, params=None, timeout=300, allow_redirects=True, proxy=None, ssl_opts=None): return await self._httpRequest('GET', url, headers=headers, ssl_verify=ssl_verify, params=params, timeout=timeout, allow_redirects=allow_redirects, proxy=proxy, ssl_opts=ssl_opts) async def _httpPost(self, url, headers=None, json=None, body=None, ssl_verify=True, params=None, timeout=300, allow_redirects=True, fields=None, proxy=None, ssl_opts=None): return await self._httpRequest('POST', url, headers=headers, json=json, body=body, ssl_verify=ssl_verify, params=params, timeout=timeout, allow_redirects=allow_redirects, fields=fields, proxy=proxy, ssl_opts=ssl_opts)
[docs] async def inetHttpConnect(self, url, headers=None, ssl_verify=True, timeout=300, params=None, proxy=None, ssl_opts=None): url = await s_stormtypes.tostr(url) headers = await s_stormtypes.toprim(headers) timeout = await s_stormtypes.toint(timeout, noneok=True) params = await s_stormtypes.toprim(params) proxy = await s_stormtypes.toprim(proxy) ssl_verify = await s_stormtypes.tobool(ssl_verify, noneok=True) ssl_opts = await s_stormtypes.toprim(ssl_opts) headers = self.strify(headers) sock = await WebSocket.anit() if proxy is not None: self.runt.confirm(('storm', 'lib', 'inet', 'http', 'proxy')) if proxy is None: proxy = await self.runt.snap.core.getConfOpt('http:proxy') connector = None if proxy: connector = aiohttp_socks.ProxyConnector.from_url(proxy) timeout = aiohttp.ClientTimeout(total=timeout) kwargs = {'timeout': timeout} if params: kwargs['params'] = params kwargs['ssl'] = self.runt.snap.core.getCachedSslCtx(opts=ssl_opts, verify=ssl_verify) try: sess = await sock.enter_context(aiohttp.ClientSession(connector=connector, timeout=timeout)) sock.resp = await sock.enter_context(sess.ws_connect(url, headers=headers, **kwargs)) sock._syn_refs = 0 self.runt.onfini(sock) return (True, sock) except asyncio.CancelledError: # pragma: no cover raise except Exception as e: # pragma: no cover await sock.fini() return s_common.retnexc(e)
def _buildFormData(self, fields): data = aiohttp.FormData() for field in fields: name = field.get('name') data.add_field(name, field.get('value'), content_type=field.get('content_type'), filename=field.get('filename'), content_transfer_encoding=field.get('content_transfer_encoding')) if data.is_multipart and not isinstance(name, str): mesg = f'Each field requires a "name" key with a string value when multipart fields are enabled: {name}' raise s_exc.BadArg(mesg=mesg, name=name) return data async def _httpRequest(self, meth, url, headers=None, json=None, body=None, ssl_verify=True, params=None, timeout=300, allow_redirects=True, fields=None, proxy=None, ssl_opts=None): meth = await s_stormtypes.tostr(meth) url = await s_stormtypes.tostr(url) json = await s_stormtypes.toprim(json) body = await s_stormtypes.toprim(body) fields = await s_stormtypes.toprim(fields) headers = await s_stormtypes.toprim(headers) params = await s_stormtypes.toprim(params) timeout = await s_stormtypes.toint(timeout, noneok=True) ssl_verify = await s_stormtypes.tobool(ssl_verify, noneok=True) allow_redirects = await s_stormtypes.tobool(allow_redirects) proxy = await s_stormtypes.toprim(proxy) ssl_opts = await s_stormtypes.toprim(ssl_opts) kwargs = {'allow_redirects': allow_redirects} if params: kwargs['params'] = self.strify(params) headers = self.strify(headers) if proxy is not None: self.runt.confirm(('storm', 'lib', 'inet', 'http', 'proxy')) if fields: if any(['sha256' in field for field in fields]): self.runt.confirm(('storm', 'lib', 'axon', 'wput')) kwargs = {} axonvers = self.runt.snap.core.axoninfo['synapse']['version'] if axonvers >= s_stormtypes.AXON_MINVERS_PROXY: kwargs['proxy'] = proxy if ssl_opts is not None: mesg = f'The ssl_opts argument requires an Axon Synapse version {s_stormtypes.AXON_MINVERS_SSLOPTS}, ' \ f'but the Axon is running {axonvers}' s_version.reqVersion(axonvers, s_stormtypes.AXON_MINVERS_SSLOPTS, mesg=mesg) kwargs['ssl_opts'] = ssl_opts axon = self.runt.snap.core.axon info = await axon.postfiles(fields, url, headers=headers, params=params, method=meth, ssl=ssl_verify, timeout=timeout, **kwargs) return HttpResp(info) kwargs['ssl'] = self.runt.snap.core.getCachedSslCtx(opts=ssl_opts, verify=ssl_verify) if proxy is None: proxy = await self.runt.snap.core.getConfOpt('http:proxy') connector = None if proxy: connector = aiohttp_socks.ProxyConnector.from_url(proxy) timeout = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession(connector=connector, timeout=timeout) as sess: try: if fields: data = self._buildFormData(fields) else: data = body # `data` and `json` are passed in kwargs only if they are not # None because of a weird interaction with aiohttp and vcrpy. if data is not None: kwargs['data'] = data if json is not None: kwargs['json'] = json async with sess.request(meth, url, headers=headers, **kwargs) as resp: history = [] for hist in resp.history: hnfo = { 'code': hist.status, 'reason': await self.codereason(hist.status), 'headers': dict(hist.headers), 'url': str(hist.url), # aiohttp has already closed the connection by this point # so there is no connection to read a body from. 'body': b'', 'history': [], 'request_headers': dict(hist.request_info.headers) } history.append(hnfo) info = { 'code': resp.status, 'reason': await self.codereason(resp.status), 'headers': dict(resp.headers), 'url': str(resp.url), 'body': await resp.read(), 'history': history, 'request_headers': dict(resp.request_info.headers) } return HttpResp(info) except asyncio.CancelledError: # pragma: no cover raise except Exception as e: logger.exception(f'Error during http {meth} @ {url}') err = s_common.err(e) errmsg = err[1].get('mesg') if errmsg: reason = f'Exception occurred during request: {err[0]}: {errmsg}' else: reason = f'Exception occurred during request: {err[0]}' info = { 'err': err, 'code': -1, 'reason': reason, 'headers': dict(), 'url': url, 'body': b'', 'history': [], 'request_headers': dict(), } return HttpResp(info)
[docs] @s_stormtypes.registry.registerType class HttpResp(s_stormtypes.Prim): ''' Implements the Storm API for a HTTP response. ''' _storm_locals = ( {'name': 'code', 'desc': 'The HTTP status code. It is -1 if an exception occurred.', 'type': 'int', }, {'name': 'reason', 'desc': 'The reason phrase for the HTTP status code.', 'type': 'str'}, {'name': 'body', 'desc': 'The raw HTTP response body as bytes.', 'type': 'bytes', }, {'name': 'headers', 'type': 'dict', 'desc': 'The HTTP Response headers.'}, {'name': 'request_headers', 'type': 'dict', 'desc': 'The HTTP Request headers.'}, {'name': 'url', 'type': 'str', 'desc': 'The response URL. If the request was redirected, this would be the final URL in the redirection chain. If the status code is -1, then this is the request URL.'}, {'name': 'err', 'type': 'list', 'desc': 'Tuple of the error type and information if an exception occurred.'}, {'name': 'history', 'desc': 'A list of response objects representing the history of the response. This is populated when responses are redirected.', 'type': {'type': 'gtor', '_gtorfunc': '_gtorHistory', 'returns': {'type': 'list', 'desc': 'A list of ``inet:http:resp`` objects.', }}}, {'name': 'json', 'desc': 'Get the JSON deserialized response.', 'type': {'type': 'function', '_funcname': '_httpRespJson', 'args': ( {'name': 'encoding', 'type': 'str', 'desc': 'Specify an encoding to use.', 'default': None, }, {'name': 'errors', 'type': 'str', 'desc': 'Specify an error handling scheme to use.', 'default': 'surrogatepass', }, ), 'returns': {'type': 'prim'} } }, {'name': 'msgpack', 'desc': 'Yield the msgpack deserialized objects.', 'type': {'type': 'function', '_funcname': '_httpRespMsgpack', 'returns': {'name': 'Yields', 'type': 'prim', 'desc': 'Unpacked values.'} } }, ) _storm_typename = 'inet:http:resp' def __init__(self, valu, path=None): super().__init__(valu, path=path) self.locls.update(self.getObjLocals()) self.locls['url'] = self.valu.get('url') self.locls['code'] = self.valu.get('code') self.locls['reason'] = self.valu.get('reason') self.locls['body'] = self.valu.get('body') self.locls['headers'] = self.valu.get('headers') self.locls['request_headers'] = self.valu.get('request_headers') self.locls['err'] = self.valu.get('err', ()) self.gtors.update({ 'history': self._gtorHistory, })
[docs] def getObjLocals(self): return { 'json': self._httpRespJson, 'msgpack': self._httpRespMsgpack, }
async def _httpRespJson(self, encoding=None, errors='surrogatepass'): try: valu = self.valu.get('body') errors = await s_stormtypes.tostr(errors) if encoding is None: encoding = json.detect_encoding(valu) else: encoding = await s_stormtypes.tostr(encoding) return json.loads(valu.decode(encoding, errors)) except UnicodeDecodeError as e: raise s_exc.StormRuntimeError(mesg=f'{e}: {s_common.trimText(repr(valu))}') from None except json.JSONDecodeError as e: mesg = f'Unable to decode HTTP response as json: {e.args[0]}' raise s_exc.BadJsonText(mesg=mesg) async def _httpRespMsgpack(self): byts = self.valu.get('body') unpk = s_msgpack.Unpk() for _, item in unpk.feed(byts): yield item async def _gtorHistory(self): return [HttpResp(hnfo) for hnfo in self.valu.get('history')]