Source code for synapse.lib.jsrpc

'''
A reusable JSON-RPC 2.0 server implementation for the Synapse Tornado web server.

This module provides ``JsonRpcHandler``, a Tornado handler which exposes its own
``@s_jsrpc.method`` decorated methods as a JSON-RPC 2.0 endpoint. It is intentionally
generic: it knows nothing about any specific protocol built on top of it (such as MCP).

To use it, extend ``JsonRpcHandler``, implement the decorated methods directly, and mount
it on a Cell using the existing addHttpApi machinery::

    class FooApi(s_jsrpc.JsonRpcHandler):

        @s_jsrpc.method()
        async def echo(self, valu):
            return valu

    cell.addHttpApi('/api/v1/jsonrpc', FooApi, {'cell': cell})

The decorated methods must be coroutine functions or async generator functions. Async
generator methods may stream their results to the caller as Server-Sent Events when the
request carries an ``Accept: text/event-stream`` header.

A method recovers the calling user through the handler auth APIs (e.g. ``self.web_useriden``
or ``self.getAuthCell()``), which work whether auth is local or delegated to a remote cell.
'''
import inspect
import logging
from http import HTTPStatus

import synapse.exc as s_exc

import synapse.lib.json as s_json
import synapse.lib.config as s_config
import synapse.lib.httpapi as s_httpapi

logger = logging.getLogger(__name__)

# JSON-RPC 2.0 reserved error codes.
PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_PARAMS = -32602
INTERNAL_ERROR = -32603

# Server-defined error returned when a non-streaming caller invokes a generator method
# whose result set exceeds MAX_RESULT_ITEMS.
RESULT_TOO_LARGE = -32000

# Maximum number of items collected for a generator method when the caller is not
# streaming; beyond this the caller must use SSE (Accept: text/event-stream).
MAX_RESULT_ITEMS = 10000

[docs] def method(name=None, desc=None, params=None, returns=None): ''' Decorate a method to expose it as a remotely callable JSON-RPC method. Args: name (str): An optional JSON-RPC method name override. This allows names which are not valid python identifiers (e.g. ``tools/list``). Defaults to the function name. desc (str): A human readable description of the method. params (dict): An optional JSON Schema used to validate the request params. returns (dict): An optional JSON Schema describing the result for introspection. Notes: Only methods decorated with this decorator are exposed; it is an opt-in allowlist. The decorated method must be a coroutine function or an async generator function. ''' def wrap(func): if not (inspect.iscoroutinefunction(func) or inspect.isasyncgenfunction(func)): raise s_exc.BadArg(mesg=f'jsrpc method must be async: {func.__qualname__}') func._jsrpc_method = { 'name': name if name is not None else func.__name__, 'desc': desc, 'params': params, 'returns': returns, 'genr': inspect.isasyncgenfunction(func), } return func return wrap
def _methodArgSignature(func): # The argument signature of a handler method as seen on its bound method (the leading # 'self' parameter removed), used to validate request params. Computed once per method # registry entry rather than on every dispatch, since inspect.signature() is not cheap. sig = inspect.signature(func) return sig.replace(parameters=list(sig.parameters.values())[1:])
[docs] class JsonRpcHandler(s_httpapi.Handler): ''' A Tornado handler which exposes its own decorated methods as a JSON-RPC 2.0 endpoint. Subclass this and implement methods decorated with ``@s_jsrpc.method``. ''' @classmethod def _getMarkedMethods(cls, marker): ''' Return a list of ``(attrname, info)`` for callable members carrying the given marker attribute (set by a registration decorator). ''' retn = [] for attrname in dir(cls): attr = getattr(cls, attrname, None) if not callable(attr): continue info = getattr(attr, marker, None) if info is None: continue retn.append((attrname, info)) return retn
[docs] @classmethod def loadMethodDefs(cls): ''' Introspect the handler class and return its JSON-RPC method registry. Returns: dict: A JSON compatible mapping of JSON-RPC method name to ``{'attr': attrname, 'info': info}`` where info is the method definition (name, desc, params, returns, genr). The registry is cached on the class; the compiled params validators and method arg signatures are stored separately (in the ``_syn_jsrpc_validators`` and ``_syn_jsrpc_signatures`` class locals) so the registry remains JSON serializable and suitable for higher level introspection. ''' meths = cls.__dict__.get('_syn_jsrpc_meths') if meths is not None: return meths meths = {} validators = {} signatures = {} for attrname, info in cls._getMarkedMethods('_jsrpc_method'): meths[info.get('name')] = {'attr': attrname, 'info': info} signatures[info.get('name')] = _methodArgSignature(getattr(cls, attrname)) if info.get('params') is not None: validators[info.get('name')] = s_config.getJsValidator(info.get('params')) cls._syn_jsrpc_meths = meths cls._syn_jsrpc_validators = validators cls._syn_jsrpc_signatures = signatures return meths
[docs] async def post(self): if not await self.reqAuthUser(): return try: mesg = s_json.loads(self.request.body) except Exception: exc = s_exc.JsonRpcError.init(PARSE_ERROR, 'Parse error') self._sendResp(self._errResp(None, exc)) return if isinstance(mesg, list): await self._handleBatch(mesg) else: await self._handleSingle(mesg)
async def _handleSingle(self, req): kind, *rest = await self._dispatch(req, allow_stream=True) if kind == 'stream': reqid, agen = rest await self._streamSse(reqid, agen) return resp = rest[0] if resp is None: self.set_status(HTTPStatus.NO_CONTENT) return self._sendResp(resp) async def _handleBatch(self, batch): if len(batch) == 0: exc = s_exc.JsonRpcError.init(INVALID_REQUEST, 'Invalid Request') self._sendResp(self._errResp(None, exc)) return resps = [] for req in batch: kind, resp = await self._dispatch(req, allow_stream=False) if resp is not None: resps.append(resp) if not resps: self.set_status(HTTPStatus.NO_CONTENT) return self._sendResp(resps) async def _dispatch(self, req, allow_stream): ''' Dispatch a single parsed JSON-RPC request object. Returns: tuple: Either ``('resp', obj_or_None)`` where obj is a JSON-RPC response object (or None to suppress a notification response), or ``('stream', reqid, agen)`` to stream an async generator to the caller. ''' if not self._isValidReq(req): # The validity of the request is in question, so the id cannot be trusted and # the JSON-RPC spec requires the response id to be null. exc = s_exc.JsonRpcError.init(INVALID_REQUEST, 'Invalid Request') return ('resp', self._errResp(None, exc)) hasid = 'id' in req reqid = req.get('id') name = req.get('method') params = req.get('params') try: entry = self.loadMethodDefs().get(name) if entry is None: raise s_exc.JsonRpcError.init(METHOD_NOT_FOUND, f'Method not found: {name}') args, kwargs = self._bindParams(params) meth = getattr(self, entry.get('attr')) try: self._syn_jsrpc_signatures.get(name).bind(*args, **kwargs) except TypeError as e: raise s_exc.JsonRpcError.init(INVALID_PARAMS, f'Invalid params: {e}') validator = self._syn_jsrpc_validators.get(name) if validator is not None: try: validator(params) except s_exc.SchemaViolation as e: raise s_exc.JsonRpcError.init(INVALID_PARAMS, e.get('mesg', str(e))) if entry.get('info').get('genr'): agen = meth(*args, **kwargs) if hasid and allow_stream and self._wantsStream(): return ('stream', reqid, agen) # Without streaming we must buffer the whole result; cap it so a large # generator result set cannot exhaust memory. result = [] async for item in agen: if len(result) >= MAX_RESULT_ITEMS: await agen.aclose() raise s_exc.JsonRpcError.init(RESULT_TOO_LARGE, 'Result set too large; retry with SSE streaming (Accept: text/event-stream).') result.append(item) else: result = await meth(*args, **kwargs) except s_exc.JsonRpcError as e: if not hasid: return ('resp', None) return ('resp', self._errResp(reqid, e)) except Exception as e: logger.exception(f'jsonrpc method error: {name}') if not hasid: return ('resp', None) return ('resp', self._internalErrResp(reqid, e)) if not hasid: return ('resp', None) return ('resp', {'jsonrpc': '2.0', 'id': reqid, 'result': result}) def _isValidReq(self, req): if not isinstance(req, dict): return False if req.get('jsonrpc') != '2.0': return False if not isinstance(req.get('method'), str): return False if 'id' in req: reqid = req.get('id') if isinstance(reqid, bool) or not isinstance(reqid, (str, int, float, type(None))): return False return True def _bindParams(self, params): if params is None: return (), {} if isinstance(params, (list, tuple)): return tuple(params), {} if isinstance(params, dict): return (), dict(params) raise s_exc.JsonRpcError.init(INVALID_PARAMS, 'Params must be an array or object.') def _errResp(self, reqid, exc): err = {'code': exc.get('code'), 'message': exc.get('mesg')} data = exc.get('data') if data is not None: err['data'] = data return {'jsonrpc': '2.0', 'id': reqid, 'error': err} def _internalErrResp(self, reqid, exc): err = {'code': INTERNAL_ERROR, 'message': 'Internal error'} if isinstance(exc, s_exc.SynErr): err['message'] = exc.get('mesg', str(exc)) data = self._safeData(exc.items()) if data: err['data'] = data else: err['message'] = str(exc) return {'jsonrpc': '2.0', 'id': reqid, 'error': err} def _safeData(self, info): try: s_json.dumps(info) return info except Exception: return None def _wantsStream(self): return 'text/event-stream' in self.request.headers.get('Accept', '') def _sendResp(self, obj): self.set_header('Content-Type', 'application/json') self.write(s_json.dumps(obj)) async def _streamSse(self, reqid, agen): self.set_header('Content-Type', 'text/event-stream') self.set_header('Cache-Control', 'no-cache') try: async for item in agen: mesg = {'jsonrpc': '2.0', 'method': 'data', 'params': {'id': reqid, 'item': item}} await self._sendSse(mesg) resp = {'jsonrpc': '2.0', 'id': reqid, 'result': None} except Exception as e: logger.exception('jsonrpc stream error') resp = self._internalErrResp(reqid, e) await self._sendSse(resp) async def _sendSse(self, mesg): self.write(b'data: ') self.write(s_json.dumps(mesg)) self.write(b'\n\n') await self.flush()