Source code for synapse.lib.logging

import os
import sys
import asyncio
import logging
import weakref
import traceback
import collections

import synapse.exc as s_exc

import synapse.lib.coro as s_coro
import synapse.lib.json as s_json
import synapse.lib.const as s_const
import synapse.lib.scope as s_scope

logger = logging.getLogger(__name__)

_log_wins = weakref.WeakSet()

LOG_PUMP_TASK_TIMEOUT = 1
LOG_QUEUE_SIZE = 1000

[docs] def excinfo(e, _seen=None): if _seen is None: _seen = set() _seen.add(e) tb = [] for path, line, func, sorc in traceback.extract_tb(e.__traceback__): tb.append((path, line, func, sorc)) ret = { 'code': e.__class__.__name__, 'traceback': tb, } if notes := getattr(e, '__notes__', None): ret['notes'] = tuple(notes) if (cause := getattr(e, '__cause__', None)) is not None: if isinstance(cause, Exception) and cause not in _seen: ret['cause'] = excinfo(cause, _seen=_seen) if (context := getattr(e, '__context__', None)) is not None: if isinstance(context, Exception) and context not in _seen: ret['context'] = excinfo(context, _seen=_seen) if isinstance(e, s_exc.SynErr): ret['info'] = e.errinfo.copy() ret['mesg'] = ret['info'].pop('mesg', None) if isinstance(e, BaseExceptionGroup) and e.exceptions: ret['group'] = [excinfo(exc) for exc in e.exceptions] if ret.get('mesg') is None: ret['mesg'] = str(e) return ret
_glob_loginfo = {}
[docs] def setLogInfo(name, valu): ''' Configure global values which should be added to every log. ''' _glob_loginfo[name] = valu
[docs] def popLogInfo(name): ''' Remove a global value from being added to every log. ''' _glob_loginfo.pop(name, None)
[docs] def getLogExtra(**kwargs): ''' Construct a properly enveloped log extra dictionary. ''' extra = {'params': kwargs, 'loginfo': {}} return extra
[docs] class JsonFormatter(logging.Formatter):
[docs] def genLogInfo(self, record): record.message = record.getMessage() loginfo = { 'message': record.message, 'logger': { 'name': record.name, 'func': record.funcName, 'process': record.processName, 'thread': record.threadName, }, 'level': record.levelname, 'time': self.formatTime(record, self.datefmt), } loginfo.update(_glob_loginfo) if hasattr(record, 'loginfo'): loginfo.update(record.loginfo) if (user := s_scope.get('user')) is not None: loginfo['user'] = user.iden loginfo['username'] = user.name elif (sess := s_scope.get('sess')) is not None: if sess.user is not None: loginfo['user'] = sess.user.iden loginfo['username'] = sess.user.name if record.exc_info: loginfo['error'] = excinfo(record.exc_info[1]) if not hasattr(record, 'params'): record.params = {} loginfo['params'] = record.params # the subsequent emit() will set the event StreamHandler._logs_fifo.append(loginfo) StreamHandler._logs_todo.append(loginfo) return loginfo
[docs] def format(self, record): loginfo = self.genLogInfo(record) return s_json.dumps(loginfo, default=str).decode()
[docs] class TextFormatter(JsonFormatter): def __init__(self, *args, **kwargs): kwargs['fmt'] = s_const.LOG_FORMAT return super().__init__(*args, **kwargs)
[docs] def format(self, record): # this is required to send the structured data loginfo = self.genLogInfo(record) return logging.Formatter.format(self, record)
[docs] class StreamHandler(logging.StreamHandler): _pump_task = None _pump_event = None _pump_exit_flag = False _glob_handler = None _logs_fifo = collections.deque(maxlen=LOG_QUEUE_SIZE) _logs_todo = collections.deque(maxlen=LOG_QUEUE_SIZE) _text_todo = collections.deque(maxlen=LOG_QUEUE_SIZE)
[docs] def emit(self, record): if self._pump_task is None: return logging.StreamHandler.emit(self, record) try: text = self.format(record) self._text_todo.append(text) self._pump_event.set() # emulating behavior of parent class except RecursionError: # pragma: no cover raise except Exception as e: # pragma: no cover self.handleError(record)
def _writestderr(text): sys.stderr.write(text) sys.stderr.flush() async def _pumpLogStream(): while True: try: await StreamHandler._pump_event.wait() logstodo = tuple(StreamHandler._logs_todo) texttodo = tuple(StreamHandler._text_todo) if not logstodo and not texttodo: StreamHandler._pump_event.clear() if StreamHandler._pump_exit_flag is True: return continue # pragma: no cover StreamHandler._logs_todo.clear() StreamHandler._text_todo.clear() StreamHandler._pump_event.clear() fulltext = '\n'.join(texttodo) + '\n' for wind in _log_wins: await wind.puts(logstodo) # Don't hold onto refs of the Window objects inside of this function after we have used them. # If we don't clear this ref, then we will hold a reference to the window object longer than needed. # This can lead to the last window object never being GC'd while the pumpLogStream task is running, # even after its caller has exited the watch() function. wind = None # NOQA await s_coro.executor(_writestderr, fulltext) if StreamHandler._pump_exit_flag is True and len(StreamHandler._logs_todo) == 0 and len(StreamHandler._text_todo) == 0: return except Exception: _writestderr('Error during log handling:\n' + traceback.format_exc())
[docs] def logs(last=100): return tuple(StreamHandler._logs_fifo)[-last:]
[docs] async def watch(last=100): # avoid a circular import... import synapse.lib.queue as s_queue async with await s_queue.Window.anit(maxsize=10000) as window: await window.puts(logs(last=last)) _log_wins.add(window) async for item in window: yield item
_glob_logconf = {}
[docs] def setup(**conf): ''' Configure synapse logging. NOTE: If this API is invoked while there is a running asyncio loop, it will automatically enter async mode and fire a task to pump log events without blocking. ''' conf.update(getLogConfFromEnv()) if conf.get('level') is None: conf['level'] = logging.INFO if conf.get('structlog') is None: conf['structlog'] = False fmtclass = JsonFormatter if not conf.get('structlog'): fmtclass = TextFormatter if s_coro.has_running_loop() and StreamHandler._pump_task is None: StreamHandler._pump_event = asyncio.Event() StreamHandler._pump_task = s_coro.create_task(_pumpLogStream()) # this is used to pass things like service name # to child processes and forked workers... loginfo = conf.pop('loginfo', None) if loginfo is not None: _glob_loginfo.update(loginfo) _glob_logconf.clear() _glob_logconf.update(conf) rootlogger = logging.getLogger() level = normLogLevel(conf.get('level')) rootlogger.setLevel(level) if StreamHandler._glob_handler is None: handler = StreamHandler() handler.setFormatter(fmtclass(datefmt=conf.get('datefmt'))) StreamHandler._glob_handler = handler rootlogger.handlers.append(handler) return conf
[docs] def reset(clear=True): # This may be called by tests to cleanup loop specific objects # ( it does not need to be called by in general by service fini ) if StreamHandler._glob_handler is not None: rootlogger = logging.getLogger() rootlogger.handlers.remove(StreamHandler._glob_handler) if StreamHandler._pump_task is not None: StreamHandler._pump_task.cancel() StreamHandler._pump_task = None StreamHandler._pump_event = None StreamHandler._pump_exit_flag = False StreamHandler._glob_handler = None StreamHandler._text_todo.clear() StreamHandler._logs_fifo.clear() StreamHandler._logs_todo.clear() if clear: _glob_logconf.clear() _glob_loginfo.clear()
async def _shutdown_task(): # Give the pump task a small opportunity to drain its # queue of items and exit cleanly. if StreamHandler._pump_task is not None: StreamHandler._pump_exit_flag = True # Set the task to exit StreamHandler._pump_event.set() # Wake the task try: await asyncio.wait_for(StreamHandler._pump_task, timeout=LOG_PUMP_TASK_TIMEOUT) except asyncio.TimeoutError: # pragma: no cover pass
[docs] async def shutdown(): # pragma: no cover ''' Inverse of setup. Gives the pump task the opportunity to exit before removing it and resetting log attributes. A StreamHandler is then re-installed on the root logger to allow for messages from sources like atexit handlers to be logged. This should be called at service or tool teardown. ''' await _shutdown_task() # Reset all logging configs except globals since we may need those. reset(clear=False) fmtclass = JsonFormatter if not _glob_logconf.get('structlog'): fmtclass = TextFormatter # Reinstall a StreamHandler and the formatter on the root logger rootlogger = logging.getLogger() stream = logging.StreamHandler() stream.setFormatter(fmtclass(datefmt=_glob_logconf.get('datefmt'))) rootlogger.addHandler(stream)
[docs] def getLogConf(): logconf = _glob_logconf.copy() logconf['loginfo'] = _glob_loginfo.copy() return logconf
[docs] def getLogConfFromEnv(): conf = {} if (level := os.getenv('SYN_LOG_LEVEL')) is not None: conf['level'] = normLogLevel(level) if (datefmt := os.getenv('SYN_LOG_DATEFORMAT')) is not None: conf['datefmt'] = datefmt if (structlog := os.getenv('SYN_LOG_STRUCT')) is not None: conf['structlog'] = structlog.lower() in ('1', 'true') return conf
[docs] def normLogLevel(valu): ''' Normalize a log level value to an integer. Args: valu: The value to norm ( a string or integer ). Returns: int: A valid log level integer. ''' if isinstance(valu, str): valu = valu.strip() level = s_const.LOG_LEVEL_CHOICES.get(valu.upper()) if level is not None: return level try: valu = int(valu) except ValueError: raise s_exc.BadArg(mesg=f'Invalid log level provided: {valu}', valu=valu) from None if isinstance(valu, int): if valu not in s_const.LOG_LEVEL_INVERSE_CHOICES: raise s_exc.BadArg(mesg=f'Invalid log level provided: {valu}', valu=valu) return valu raise s_exc.BadArg(mesg=f'Unknown log level type: {type(valu)} {valu}', valu=valu)