import io
import logging
import msgpack
import msgpack.fallback as m_fallback
import synapse.exc as s_exc
logger = logging.getLogger(__name__)
def _ext_un(code, byts):
if code == 0:
return int.from_bytes(byts, 'big')
elif code == 1:
return int.from_bytes(byts, 'big', signed=True)
else: # pragma: no cover
mesg = f'Invalid msgpack ext code: {code} ({repr(byts)[:20]})'
raise s_exc.SynErr(mesg=mesg)
def _ext_en(item):
if isinstance(item, int):
if item > 0xffffffffffffffff:
size = (item.bit_length() + 7) // 8
return msgpack.ExtType(0, item.to_bytes(size, 'big'))
if item < -0x8000000000000000:
size = (item.bit_length() // 8) + 1
return msgpack.ExtType(1, item.to_bytes(size, 'big', signed=True))
return item
# Single Packer object which is reused for performance
pakr = msgpack.Packer(use_bin_type=True, unicode_errors='surrogatepass', default=_ext_en)
if isinstance(pakr, m_fallback.Packer): # pragma: no cover
logger.warning('******************************************************************************************************')
logger.warning('* msgpack is using the pure python fallback implementation. This will impact performance negatively. *')
logger.warning('* Check https://github.com/msgpack/msgpack-python for troubleshooting msgpack on your platform. *')
logger.warning('******************************************************************************************************')
pakr = None
# synapse.lib.msgpack.un uses a hardcoded subset of these arguments for speed
unpacker_kwargs = {
'raw': False,
'use_list': False,
'strict_map_key': False,
'ext_hook': _ext_un,
'max_buffer_size': 2**32 - 1,
'unicode_errors': 'surrogatepass'
}
[docs]
def en(item):
'''
Use msgpack to serialize a compatible python object.
Args:
item (obj): The object to serialize
Notes:
String objects are encoded using utf8 encoding. In order to handle
potentially malformed input, ``unicode_errors='surrogatepass'`` is set
to allow encoding bad input strings.
Returns:
bytes: The serialized bytes in msgpack format.
'''
try:
return pakr.pack(item)
except TypeError as e:
pakr.reset()
mesg = f'{e.args[0]}: {repr(item)[:20]}'
raise s_exc.NotMsgpackSafe(mesg=mesg) from e
except Exception as e:
pakr.reset()
mesg = f'Cannot serialize: {repr(e)}: {repr(item)[:20]}'
raise s_exc.NotMsgpackSafe(mesg=mesg) from e
def _fallback_en(item):
'''
Use msgpack to serialize a compatible python object.
Args:
item (obj): The object to serialize
Notes:
String objects are encoded using utf8 encoding. In order to handle
potentially malformed input, ``unicode_errors='surrogatepass'`` is set
to allow encoding bad input strings.
Returns:
bytes: The serialized bytes in msgpack format.
'''
try:
return msgpack.packb(item, use_bin_type=True,
unicode_errors='surrogatepass', default=_ext_en)
except TypeError as e:
mesg = f'{e.args[0]}: {repr(item)[:20]}'
raise s_exc.NotMsgpackSafe(mesg=mesg) from e
except Exception as e:
mesg = f'Cannot serialize: {repr(e)}: {repr(item)[:20]}'
raise s_exc.NotMsgpackSafe(mesg=mesg) from e
# Redefine the en() function if we're in fallback mode.
if pakr is None: # pragma: no cover
en = _fallback_en
[docs]
def un(byts, use_list=False):
'''
Use msgpack to de-serialize a python object.
Args:
byts (bytes): The bytes to de-serialize
Notes:
String objects are decoded using utf8 encoding. In order to handle
potentially malformed input, ``unicode_errors='surrogatepass'`` is set
to allow decoding bad input strings.
Returns:
obj: The de-serialized object
'''
# This uses a subset of unpacker_kwargs
return msgpack.loads(byts, use_list=use_list, raw=False, strict_map_key=False,
unicode_errors='surrogatepass', ext_hook=_ext_un)
[docs]
def isok(item):
'''
Returns True if the item can be msgpacked (by testing packing).
'''
try:
en(item)
return True
except Exception:
return False
[docs]
def iterfd(fd):
'''
Generator which unpacks a file object of msgpacked content.
Args:
fd: File object to consume data from.
Notes:
String objects are decoded using utf8 encoding. In order to handle
potentially malformed input, ``unicode_errors='surrogatepass'`` is set
to allow decoding bad input strings.
Yields:
Objects from a msgpack stream.
'''
unpk = msgpack.Unpacker(fd, **unpacker_kwargs)
for mesg in unpk:
yield mesg
[docs]
def iterfile(path, since=-1):
'''
Generator which yields msgpack objects from a file path.
Args:
path: File path to open and consume data from.
Notes:
String objects are decoded using utf8 encoding. In order to handle
potentially malformed input, ``unicode_errors='surrogatepass'`` is set
to allow decoding bad input strings.
Yields:
Objects from a msgpack stream.
'''
with io.open(path, 'rb') as fd:
unpk = msgpack.Unpacker(fd, **unpacker_kwargs)
for i, mesg in enumerate(unpk):
if i <= since:
continue
yield mesg
[docs]
class Unpk:
'''
An extension of the msgpack streaming Unpacker which reports sizes.
Notes:
String objects are decoded using utf8 encoding. In order to handle
potentially malformed input, ``unicode_errors='surrogatepass'`` is set
to allow decoding bad input strings.
'''
def __init__(self):
self.size = 0
self.unpk = msgpack.Unpacker(**unpacker_kwargs)
[docs]
def feed(self, byts):
'''
Feed bytes to the unpacker and return completed objects.
Args:
byts (bytes): Bytes to unpack.
Notes:
It is intended that this function is called multiple times with
bytes from some sort of a stream, as it will unpack and return
objects as they are available.
Returns:
list: List of tuples containing the item size and the unpacked item.
'''
self.unpk.feed(byts)
retn = []
while True:
try:
item = self.unpk.unpack()
tell = self.unpk.tell()
retn.append((tell - self.size, item))
self.size = tell
except msgpack.exceptions.OutOfData:
break
return retn
[docs]
def loadfile(path):
'''
Load and upack the msgpack bytes from a file by path.
Args:
path (str): The file path to a message pack file.
Raises:
msgpack.exceptions.ExtraData: If the file contains multiple objects.
Returns:
(obj): The decoded python object.
'''
with io.open(path, 'rb') as fd:
return un(fd.read())
[docs]
def dumpfile(item, path):
'''
Dump an object to a file by path.
Args:
item (object): The object to serialize.
path (str): The file path to save.
Returns:
None
'''
with io.open(path, 'wb') as fd:
fd.write(en(item))
[docs]
def deepcopy(item, use_list=False):
'''
Copy a msgpack serializable by packing then unpacking it.
For complex primitives, this runs in about 1/3 the time of
copy.deepcopy()
'''
return un(en(item), use_list=use_list)
[docs]
def getvars(varz):
items = []
for item in varz.items():
if not isok(item):
continue
items.append(item)
return dict(items)