Source code for synapse.lib.base

import gc
import os
import sys
import atexit
import signal
import asyncio
import inspect
import logging
import weakref
import contextlib
import collections

if __debug__:
    import traceback

import synapse.exc as s_exc
import synapse.glob as s_glob

import synapse.lib.coro as s_coro
import synapse.lib.scope as s_scope
import synapse.lib.logging as s_logging

logger = logging.getLogger(__name__)

OMIT_FINI_WARNS = os.environ.get('SYNDEV_OMIT_FINI_WARNS', False)
BASE_MAIN_BG_TASK_TIMEOUT = int(os.environ.get('SYNDEV_BASE_MAIN_BG_TASK_TIMEOUT', 3))

def _fini_atexit():  # pragma: no cover

    for item in gc.get_objects():

        if not isinstance(item, Base):
            continue

        if not item.anitted:
            continue

        if item.isfini:
            continue

        if not item._fini_atexit and not OMIT_FINI_WARNS:
            if __debug__:
                logger.debug(f'At exit: Missing fini for {item}')
                for depth, call in enumerate(item.call_stack[:-2]):
                    logger.debug(f'{depth + 1:3}: {call.strip()}')
            continue

        try:
            if __debug__:
                logger.debug('At exit: Calling fini for %r', item)
            rv = item.fini()
            if s_coro.iscoro(rv):
                # Try to run the fini on its loop
                loop = item.loop
                if not loop.is_running():
                    continue
                loop.create_task(rv)

        except Exception:
            logger.exception('atexit fini fail: %r' % (item,))

atexit.register(_fini_atexit)

[docs] class Base: ''' Base class for Synapse objects. Acts as an observable, enables async init and fini. Example: class Foo(Base): async def __anit__(self, x, y): await Base.__anit__(self) await stuff(x, y) foo = await Foo.anit(10) Note: One should not create instances directly via its initializer, i.e. Base(). One shall always use the class method anit. ''' def __init__(self): self.anitted = False assert sys._getframe(1).f_code.co_name == 'anit', 'Objects from Base must be constructed solely via "anit"'
[docs] @classmethod async def anit(cls, *args, **kwargs): # sneak in a quick loop check here for convenience if s_glob._glob_loop is None: s_glob.initloop() self = cls() try: await self.__anit__(*args, **kwargs) except (asyncio.CancelledError, Exception): if self.anitted: await self.fini() raise try: await self.postAnit() except (asyncio.CancelledError, Exception): logger.exception('Error during postAnit callback.') await self.fini() raise return self
async def __anit__(self): self.loop = asyncio.get_running_loop() if __debug__: import synapse.lib.threads as s_threads # avoid import cycle self.tid = s_threads.iden() self.call_stack = traceback.format_stack() # For cleanup debugging if object.__getattribute__(self, 'anitted') is True: # The Base has already been anitted. This allows a class to treat # multiple Base objects as a mixin and __anit__ themselves without # smashing fini or event handlers from the others. return self.isfini = False self.anitted = True # For assertion purposes self.finievt = asyncio.Event() # hold a weak ref to other bases we should fini if they # are still around when we go down... self.tofini = weakref.WeakSet() self._syn_funcs = collections.defaultdict(list) self._syn_refs = 1 # one ref for the ctor self._syn_links = [] self._fini_funcs = [] self._fini_atexit = False self._active_tasks = None # the set of free running tasks associated with me self._context_managers = None # the set of context managers i must fini self._syn_signal_tasks = None # initialized as a Set when addSignalHandlers is called.
[docs] async def postAnit(self): ''' Method called after self.__anit__() has completed, but before anit() returns the object to the caller. ''' pass
[docs] async def enter_context(self, item): ''' Modeled on Python's contextlib.ExitStack.enter_context. Enters a new context manager and adds its __exit__() and __aexit__ method to its onfini handlers. Returns: The result of item’s own __aenter__ or __enter__() method. ''' if self.isfini: # pragma: no cover mesg = 'Cannot enter_context on a fini()d object.' raise s_exc.IsFini(mesg=mesg) if self._context_managers is None: self._context_managers = [] self._context_managers.append(item) entr = getattr(item, '__aenter__', None) if entr is not None: return await entr() entr = getattr(item, '__enter__', None) assert entr is not None return entr()
[docs] def onfini(self, func): ''' Add a function/coroutine/Base to be called on fini(). The rules around how to register function/coroutine/Base to be called: - Call this method with an instance of Base (this class) if holding a reference to a bound method of the instance (such as a fini() method) would cause the object to be leaked. This is appropriate for ephemeral objects that may be constructed/destroyed multiple times over the lifetime of a process. - Call this method with an instance method if you want the object to have a lifetime as long as the thing being fini'd. ''' if self.isfini: if isinstance(func, Base): s_coro.create_task(func.fini()) else: s_coro.create_task(s_coro.ornot(func)) return if isinstance(func, Base): self.tofini.add(func) return assert self.anitted self._fini_funcs.append(func)
async def __aenter__(self): assert asyncio.get_running_loop() == self.loop return self async def __aexit__(self, exc, cls, tb): # Either there should be no running loop or we shall be on the right one try: assert asyncio.get_running_loop() == self.loop except RuntimeError: pass await self.fini()
[docs] def incref(self): ''' Increment the reference count for this base. This API may be optionally used to control fini(). ''' self._syn_refs += 1 return self._syn_refs
[docs] def on(self, evnt, func, base=None): ''' Add an base function callback for a specific event with optional filtering. If the function returns a coroutine, it will be awaited. Args: evnt (str): An event name func (function): A callback function to receive event tufo Examples: Add a callback function and fire it: async def baz(event): x = event[1].get('x') y = event[1].get('y') return x + y d.on('foo', baz) # this fire triggers baz... await d.fire('foo', x=10, y=20) Returns: None: ''' funcs = self._syn_funcs[evnt] if func in funcs: return funcs.append(func) if base is not None: def fini(): self.off(evnt, func) base.onfini(fini)
[docs] def off(self, evnt, func): ''' Remove a previously registered event handler function. Example: base.off( 'foo', onFooFunc ) ''' funcs = self._syn_funcs.get(evnt) if funcs is None: return try: funcs.remove(func) except ValueError: pass
[docs] async def fire(self, evtname, **info): ''' Fire the given event name on the Base. Returns a list of the return values of each callback. Example: for ret in d.fire('woot',foo='asdf'): print('got: %r' % (ret,)) ''' event = (evtname, info) if self.isfini: return event await self.dist(event) return event
[docs] async def dist(self, mesg): ''' Distribute an existing event tuple. Args: mesg ((str,dict)): An event tuple. Example: await base.dist( ('foo',{'bar':'baz'}) ) ''' if self.isfini: return () ret = [] for func in self._syn_funcs.get(mesg[0], ()): try: ret.append(await s_coro.ornot(func, mesg)) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception('base %s error with mesg %s', self, mesg) for func in self._syn_links: try: ret.append(await s_coro.ornot(func, mesg)) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise except Exception: logger.exception('base %s error with mesg %s', self, mesg) return ret
async def _kill_active_tasks(self): if not self._active_tasks: return for task in self._active_tasks.copy(): task.cancel() try: await task except (asyncio.CancelledError, Exception): # The taskDone callback will emit the exception. No need to repeat pass
[docs] async def fini(self): ''' Shut down the object and notify any onfini() coroutines. Returns: Remaining ref count ''' assert self.anitted, f'{self.__class__.__name__} initialized improperly. Must use Base.anit class method.' if self.isfini: return if __debug__: import synapse.lib.threads as s_threads # avoid import cycle assert s_threads.iden() == self.tid self._syn_refs -= 1 if self._syn_refs > 0: return self._syn_refs self.isfini = True for base in list(self.tofini): await base.fini() # Do not continue to hold a reference to the last item we iterated on. base = None # NOQA await self._kill_active_tasks() if self._context_managers is not None: for item in reversed(self._context_managers): exit = getattr(item, '__aexit__', None) if exit is not None: try: await exit(None, None, None) except Exception: logger.exception(f'{self} {item} - context aexit failed!') continue exit = getattr(item, '__exit__', None) if exit is not None: try: exit(None, None, None) except Exception: logger.exception(f'{self} {item} - context exit failed!') continue for fini in self._fini_funcs: try: await s_coro.ornot(fini) except Exception: logger.exception(f'{self} - fini function failed: {fini}') self._syn_funcs.clear() self._fini_funcs.clear() self.finievt.set() return 0
[docs] @contextlib.contextmanager def onWith(self, evnt, func): ''' A context manager which can be used to add a callback and remove it when using a ``with`` statement. Args: evnt (str): An event name func (function): A callback function to receive event tufo ''' self.on(evnt, func) # Allow exceptions to propagate during the context manager # but ensure we cleanup our temporary callback try: yield self finally: self.off(evnt, func)
def _wouldfini(self): '''Check if a Base would be fini() if fini() was called on it.''' return self._syn_refs == 1
[docs] async def waitfini(self, timeout=None): ''' Wait for the base to fini() Returns: None if timed out, True if fini happened Example: base.waitfini(timeout=30) ''' return await s_coro.event_wait(self.finievt, timeout)
[docs] def schedCoro(self, coro): ''' Schedules a free-running coroutine to run on this base's event loop. It does not pend on coroutine completion. Args: coro: The coroutine to schedule. Notes: This function is *not* threadsafe and must be run on the Base's event loop. Tasks created by this function do inherit the synapse.lib.scope Scope from the current task. Raises IsFini if the Base has already been fini'd. Returns: asyncio.Task: An asyncio.Task object. ''' if __debug__: assert inspect.isawaitable(coro) import synapse.lib.threads as s_threads # avoid import cycle assert s_threads.iden() == self.tid if self.isfini: coro.close() raise s_exc.IsFini(mesg=f'Cannot schedCoro on a fini Base: {self}') if self._active_tasks is None: self._active_tasks = set() task = self.loop.create_task(coro) s_scope.clone(task) def taskDone(task): self._active_tasks.remove(task) try: task.result() except asyncio.CancelledError: pass except Exception: logger.exception('Task %s scheduled through Base.schedCoro raised exception', task) self._active_tasks.add(task) task.add_done_callback(taskDone) return task
[docs] def schedCallSafe(self, func, *args, **kwargs): ''' Schedule a function to run as soon as possible on the same event loop that this Base is running on. This function does *not* pend on the function completion. Args: func: *args: **kwargs: Notes: This method may be called from outside of the event loop on a different thread. This function will break any task scoping done with synapse.lib.scope. Returns: concurrent.futures.Future: A Future representing the eventual function execution. ''' def real(): return func(*args, **kwargs) return self.loop.call_soon_threadsafe(real)
[docs] def schedCoroSafe(self, coro): ''' Schedules a coroutine to run as soon as possible on the same event loop that this Base is running on. This function does *not* pend on coroutine completion. Notes: This method may be run outside the event loop on a different thread. This function will break any task scoping done with synapse.lib.scope. If the Base is fini'd before or at the time of dispatch, the coroutine is closed. Returns: concurrent.futures.Future: A Future representing the eventual coroutine execution. ''' def _safecall(): if self.isfini: coro.close() return self.schedCoro(coro) return self.loop.call_soon_threadsafe(_safecall)
[docs] def schedCoroSafePend(self, coro): ''' Schedules a coroutine to run as soon as possible on the same event loop that this Base is running on Note: This method may *not* be run inside an event loop ''' if __debug__: import synapse.lib.threads as s_threads # avoid import cycle assert s_threads.iden() != self.tid task = asyncio.run_coroutine_threadsafe(coro, self.loop) return task.result()
[docs] async def addSignalHandlers(self): ''' Register SIGTERM/SIGINT signal handlers with the ioloop to fini this object. ''' if self._syn_signal_tasks is None: self._syn_signal_tasks = set() def sigterm(): logger.warning('Caught SIGTERM, shutting down.') task = asyncio.create_task(self.fini()) self._syn_signal_tasks.add(task) task.add_done_callback(self._syn_signal_tasks.discard) def sigint(): logger.warning('Caught SIGINT, shutting down.') task = asyncio.create_task(self.fini()) self._syn_signal_tasks.add(task) task.add_done_callback(self._syn_signal_tasks.discard) loop = asyncio.get_running_loop() loop.add_signal_handler(signal.SIGINT, sigint) loop.add_signal_handler(signal.SIGTERM, sigterm)
[docs] async def main(self, timeout=BASE_MAIN_BG_TASK_TIMEOUT): # pragma: no cover ''' Helper function to setup signal handlers for this base as the main object. ( use base.waitfini() to block ) Note: This API may only be used when the ioloop is *also* the main thread. ''' await self.addSignalHandlers() await self.waitfini() # shutdown logging to allow it to drain any queued messages it has, swapping in a stream handler, # and then cancellling the task so we do not have to await the pump task in bg tasks. await s_logging.shutdown() await s_coro.await_bg_tasks(timeout)
[docs] def waiter(self, count, *names, timeout=None): ''' Construct and return a new Waiter for events on this base. Example: # wait up to 3 seconds for 10 foo:bar events... waiter = base.waiter(10,'foo:bar') # .. fire task that will cause foo:bar events events = await waiter.wait(timeout=3) if events == None: # handle the timeout case... for event in events: # parse the events if you need... Note: Use this with caution. It's easy to accidentally construct race conditions with this mechanism ;) ''' return Waiter(self, count, *names, timeout=timeout)
[docs] class Waiter: ''' A helper to wait for a given number of events on a Base. ''' def __init__(self, base, count, *names, timeout=None): self.base = base self.names = names self.count = count self.timeout = timeout self.event = asyncio.Event() self.events = [] for name in names: base.on(name, self._onWaitEvent) if not names: base.link(self._onWaitEvent) def _onWaitEvent(self, mesg): self.events.append(mesg) if len(self.events) >= self.count: self.event.set()
[docs] async def wait(self, timeout=None): ''' Wait for the required number of events and return them or None on timeout. Example: evnts = waiter.wait(timeout=30) if evnts == None: handleTimedOut() return for evnt in evnts: doStuff(evnt) ''' if timeout is None: timeout = self.timeout try: retn = await s_coro.event_wait(self.event, timeout) if not retn: return None return self.events finally: self.fini()
[docs] def fini(self): for name in self.names: self.base.off(name, self._onWaitEvent) if not self.names: self.base.unlink(self._onWaitEvent) del self.event
async def __aenter__(self): return self async def __aexit__(self, exc, cls, tb): if exc is None: if await self.wait() is None: # pragma: no cover # these lines are 100% covered by the tests but # the coverage plugin cannot seem to see them... events = ','.join(self.names) mesg = f'timeout waiting for {self.count} event(s): {events}' raise s_exc.TimeOut(mesg=mesg)
[docs] class BaseRef(Base): ''' An object for managing multiple Base instances by name. ''' async def __anit__(self, ctor=None): await Base.__anit__(self) self.ctor = ctor self.base_by_name = {} self.onfini(self._onBaseRefFini) async def _onBaseRefFini(self): await asyncio.gather(*[base.fini() for base in self.vals()])
[docs] def put(self, name, base): ''' Add a Base (or sub-class) to the BaseRef by name. Args: name (str): The name/iden of the Base base (Base): The Base instance Returns: (None) ''' async def fini(): if self.base_by_name.get(name) is base: self.base_by_name.pop(name, None) # Remove myself from BaseRef when I fini base.onfini(fini) self.base_by_name[name] = base
[docs] def pop(self, name): ''' Remove and return a Base from the BaseRef. Args: name (str): The name/iden of the Base instance Returns: (Base): The named base ( or None ) ''' return self.base_by_name.pop(name, None)
[docs] def get(self, name): ''' Retrieve a Base instance by name. Args: name (str): The name/iden of the Base Returns: (Base): The Base instance (or None) ''' return self.base_by_name.get(name)
[docs] async def gen(self, name): ''' Atomically get/gen a Base and incref. (requires ctor during BaseRef init) Args: name (str): The name/iden of the Base instance. ''' if self.ctor is None: raise s_exc.NoSuchCtor(name=name, mesg='BaseRef.gen() requires ctor') base = self.base_by_name.get(name) if base is None: base = await self.ctor(name) self.put(name, base) else: base.incref() return base
[docs] def vals(self): return list(self.base_by_name.values())
[docs] def items(self): return list(self.base_by_name.items())
def __iter__(self): # make a copy during iteration to prevent dict # change during iteration exceptions return iter(list(self.base_by_name.values()))
[docs] async def schedGenr(genr, maxsize=100): ''' Schedule a generator to run on a separate task and yield results to this task (pipelined generator). ''' q = asyncio.Queue(maxsize=maxsize) async def genrtask(base): try: async for item in genr: await q.put((True, item)) await q.put((False, None)) except Exception: if not base.isfini: await q.put((False, None)) raise async with await Base.anit() as base: task = base.schedCoro(genrtask(base)) while not base.isfini: ok, retn = await q.get() if ok: yield retn # since we are a pipeline, yield every time... await asyncio.sleep(0) continue await task return
[docs] async def main(coro): # pragma: no cover base = await coro if isinstance(base, Base): async with base: await base.main()