Source code for synapse.glob

import os
import signal
import asyncio
import logging
import threading
import faulthandler

logger = logging.getLogger(__name__)

_glob_loop = None
_glob_thrd = None


def _asynciostacks(*args, **kwargs):  # pragma: no cover
    '''
    A signal handler used to print asyncio task stacks and thread stacks.
    '''
    print(80 * '*')
    print('Asyncio tasks stacks:')
    tasks = asyncio.all_tasks(_glob_loop)
    for task in tasks:
        task.print_stack()
    print(80 * '*')
    print('Faulthandler stack frames per thread:')
    faulthandler.dump_traceback()
    print(80 * '*')

def _threadstacks(*args, **kwargs):  # pragma: no cover
    '''
    A signal handler used to print thread stacks.
    '''
    print(80 * '*')
    print('Faulthandler stack frames per thread:')
    faulthandler.dump_traceback()
    print(80 * '*')

signal.signal(signal.SIGUSR1, _threadstacks)
signal.signal(signal.SIGUSR2, _asynciostacks)

[docs]def initloop(): global _glob_loop global _glob_thrd # if there's no global loop.... if _glob_loop is None: # check if it's us.... try: _glob_loop = asyncio.get_running_loop() # if we get here, it's us! _glob_thrd = threading.current_thread() # Enable debug and greedy coro collection setGreedCoro(_glob_loop) except RuntimeError: _glob_loop = asyncio.new_event_loop() setGreedCoro(_glob_loop) _glob_thrd = threading.Thread(target=_glob_loop.run_forever, name='SynLoop', daemon=True) _glob_thrd.start() return _glob_loop
[docs]def setGreedCoro(loop: asyncio.AbstractEventLoop): greedy_threshold = os.environ.get('SYN_GREEDY_CORO') if greedy_threshold is not None: # pragma: no cover logger.info(f'Setting ioloop.slow_callback_duration to {greedy_threshold}') loop.set_debug(True) loop.slow_callback_duration = float(greedy_threshold)
[docs]def iAmLoop(): initloop() return threading.current_thread() == _glob_thrd
[docs]def sync(coro, timeout=None): ''' Schedule a coroutine to run on the global loop and return it's result. Args: coro (coroutine): The coroutine instance. Notes: This API is thread safe and should only be called by non-loop threads. ''' loop = initloop() return asyncio.run_coroutine_threadsafe(coro, loop).result(timeout)
[docs]def synchelp(f): ''' The synchelp decorator allows the transparent execution of a coroutine using the global loop from a thread other than the event loop. In both use cases, the actual work is done by the global event loop. Examples: Use as a decorator:: @s_glob.synchelp async def stuff(x, y): await dostuff() Calling the stuff function as regular async code using the standard await syntax:: valu = await stuff(x, y) Calling the stuff function as regular sync code outside of the event loop thread:: valu = stuff(x, y) ''' def wrap(*args, **kwargs): coro = f(*args, **kwargs) if not iAmLoop(): return sync(coro) return coro return wrap