Source code for synapse.lib.stormlib.json

import copy
import json
import asyncio
import logging

import synapse.exc as s_exc

import synapse.lib.coro as s_coro
import synapse.lib.config as s_config
import synapse.lib.stormtypes as s_stormtypes

logger = logging.getLogger(__name__)

[docs]def runJsSchema(schema, item, use_default=True): # This is a target function for multiprocessing func = s_config.getJsValidator(schema, use_default=use_default, handlers=None) resp = func(item) return resp
[docs]def compileJsSchema(schema, use_default=True): # This is a target function for multiprocessing _ = s_config.getJsValidator(schema, use_default=use_default, handlers=None) return True
[docs]@s_stormtypes.registry.registerType class JsonSchema(s_stormtypes.StormType): ''' A JsonSchema validation object for use in validating data structures in Storm. ''' _storm_typename = 'json:schema' _storm_locals = ( {'name': 'schema', 'desc': 'The schema belonging to this object.', 'type': {'type': 'function', '_funcname': '_schema', 'returns': {'type': 'dict', 'desc': 'A copy of the schema used for this object.', }}}, {'name': 'validate', 'desc': 'Validate a structure against the Json Schema', 'type': {'type': 'function', '_funcname': '_validate', 'args': ({'name': 'item', 'type': 'prim', 'desc': 'A JSON structure to validate (dict, list, etc...)', }, ), 'returns': {'type': 'list', 'desc': 'An ($ok, $valu) tuple. If $ok is True, then $valu should be used as the ' 'validated data structure. If $ok is False, $valu is a dictionary with a "mesg" ' 'key.'}}}, ) _ismutable = False def __init__(self, runt, schema, use_default=True): s_stormtypes.StormType.__init__(self, None) self.runt = runt self.schema = schema self.use_default = use_default self.locls.update(self.getObjLocals())
[docs] async def stormrepr(self): return f'{self._storm_typename}: {self.schema}'
[docs] def getObjLocals(self): return { 'schema': self._schema, 'validate': self._validate, }
@s_stormtypes.stormfunc(readonly=True) async def _schema(self): return copy.deepcopy(self.schema) @s_stormtypes.stormfunc(readonly=True) async def _validate(self, item): item = await s_stormtypes.toprim(item) try: result = await s_coro.semafork(runJsSchema, self.schema, item, use_default=self.use_default) except s_exc.SchemaViolation as e: return False, {'mesg': e.get('mesg')} else: return True, result
[docs]@s_stormtypes.registry.registerLib class JsonLib(s_stormtypes.Lib): ''' A Storm Library for interacting with Json data. ''' _storm_locals = ( {'name': 'load', 'desc': 'Parse a JSON string and return the deserialized data.', 'type': {'type': 'function', '_funcname': '_jsonLoad', 'args': ( {'name': 'text', 'type': 'str', 'desc': 'The string to be deserialized.', }, ), 'returns': {'type': 'prim', 'desc': 'The JSON deserialized object.', }}}, {'name': 'save', 'desc': 'Save an object as a JSON string.', 'type': {'type': 'function', '_funcname': '_jsonSave', 'args': ( {'name': 'item', 'type': 'any', 'desc': 'The item to be serialized as a JSON string.', }, ), 'returns': {'type': 'str', 'desc': 'The JSON serialized object.', }}}, {'name': 'schema', 'desc': 'Get a JS schema validation object.', 'type': {'type': 'function', '_funcname': '_jsonSchema', 'args': ( {'name': 'schema', 'type': 'dict', 'desc': 'The JsonSchema to use.'}, {'name': 'use_default', 'type': 'boolean', 'default': True, 'desc': 'Whether to insert default schema values into the validated data structure.'}, ), 'returns': {'type': 'json:schema', 'desc': 'A validation object that can be used to validate data structures.'}}}, ) _storm_lib_path = ('json',)
[docs] def getObjLocals(self): return { 'save': self._jsonSave, 'load': self._jsonLoad, 'schema': self._jsonSchema, }
@s_stormtypes.stormfunc(readonly=True) async def _jsonSave(self, item): try: item = await s_stormtypes.toprim(item) return json.dumps(item) except Exception as e: mesg = f'Argument is not JSON compatible: {item}' raise s_exc.MustBeJsonSafe(mesg=mesg) @s_stormtypes.stormfunc(readonly=True) async def _jsonLoad(self, text): text = await s_stormtypes.tostr(text) try: return json.loads(text, strict=True) except Exception as e: mesg = f'Text is not valid JSON: {text}' raise s_exc.BadJsonText(mesg=mesg) @s_stormtypes.stormfunc(readonly=True) async def _jsonSchema(self, schema, use_default=True): schema = await s_stormtypes.toprim(schema) use_default = await s_stormtypes.tobool(use_default) # We have to ensure that we have a valid schema for making the object. try: await s_coro.semafork(compileJsSchema, schema, use_default=use_default) except asyncio.CancelledError: # pragma: no cover raise except Exception as e: raise s_exc.StormRuntimeError(mesg=f'Unable to compile Json Schema: {str(e)}', schema=schema) from e return JsonSchema(self.runt, schema, use_default=use_default)