import gc
import os
import sys
import atexit
import signal
import asyncio
import inspect
import logging
import weakref
import contextlib
import collections
if __debug__:
import traceback
import synapse.exc as s_exc
import synapse.glob as s_glob
import synapse.lib.coro as s_coro
import synapse.lib.scope as s_scope
import synapse.lib.logging as s_logging
logger = logging.getLogger(__name__)
OMIT_FINI_WARNS = os.environ.get('SYNDEV_OMIT_FINI_WARNS', False)
BASE_MAIN_BG_TASK_TIMEOUT = int(os.environ.get('SYNDEV_BASE_MAIN_BG_TASK_TIMEOUT', 3))
def _fini_atexit(): # pragma: no cover
for item in gc.get_objects():
if not isinstance(item, Base):
continue
if not item.anitted:
continue
if item.isfini:
continue
if not item._fini_atexit and not OMIT_FINI_WARNS:
if __debug__:
logger.debug(f'At exit: Missing fini for {item}')
for depth, call in enumerate(item.call_stack[:-2]):
logger.debug(f'{depth + 1:3}: {call.strip()}')
continue
try:
if __debug__:
logger.debug('At exit: Calling fini for %r', item)
rv = item.fini()
if s_coro.iscoro(rv):
# Try to run the fini on its loop
loop = item.loop
if not loop.is_running():
continue
loop.create_task(rv)
except Exception:
logger.exception('atexit fini fail: %r' % (item,))
atexit.register(_fini_atexit)
[docs]
class Base:
'''
Base class for Synapse objects.
Acts as an observable, enables async init and fini.
Example:
class Foo(Base):
async def __anit__(self, x, y):
await Base.__anit__(self)
await stuff(x, y)
foo = await Foo.anit(10)
Note:
One should not create instances directly via its initializer, i.e. Base(). One shall always use the class
method anit.
'''
def __init__(self):
self.anitted = False
assert sys._getframe(1).f_code.co_name == 'anit', 'Objects from Base must be constructed solely via "anit"'
[docs]
@classmethod
async def anit(cls, *args, **kwargs):
# sneak in a quick loop check here for convenience
if s_glob._glob_loop is None:
s_glob.initloop()
self = cls()
try:
await self.__anit__(*args, **kwargs)
except (asyncio.CancelledError, Exception):
if self.anitted:
await self.fini()
raise
try:
await self.postAnit()
except (asyncio.CancelledError, Exception):
logger.exception('Error during postAnit callback.')
await self.fini()
raise
return self
async def __anit__(self):
self.loop = asyncio.get_running_loop()
if __debug__:
import synapse.lib.threads as s_threads # avoid import cycle
self.tid = s_threads.iden()
self.call_stack = traceback.format_stack() # For cleanup debugging
if object.__getattribute__(self, 'anitted') is True:
# The Base has already been anitted. This allows a class to treat
# multiple Base objects as a mixin and __anit__ themselves without
# smashing fini or event handlers from the others.
return
self.isfini = False
self.anitted = True # For assertion purposes
self.finievt = asyncio.Event()
# hold a weak ref to other bases we should fini if they
# are still around when we go down...
self.tofini = weakref.WeakSet()
self._syn_funcs = collections.defaultdict(list)
self._syn_refs = 1 # one ref for the ctor
self._syn_links = []
self._fini_funcs = []
self._fini_atexit = False
self._active_tasks = None # the set of free running tasks associated with me
self._context_managers = None # the set of context managers i must fini
self._syn_signal_tasks = None # initialized as a Set when addSignalHandlers is called.
[docs]
async def postAnit(self):
'''
Method called after self.__anit__() has completed, but before anit() returns the object to the caller.
'''
pass
[docs]
async def enter_context(self, item):
'''
Modeled on Python's contextlib.ExitStack.enter_context. Enters a new context manager and adds its __exit__()
and __aexit__ method to its onfini handlers.
Returns:
The result of item’s own __aenter__ or __enter__() method.
'''
if self.isfini: # pragma: no cover
mesg = 'Cannot enter_context on a fini()d object.'
raise s_exc.IsFini(mesg=mesg)
if self._context_managers is None:
self._context_managers = []
self._context_managers.append(item)
entr = getattr(item, '__aenter__', None)
if entr is not None:
return await entr()
entr = getattr(item, '__enter__', None)
assert entr is not None
return entr()
[docs]
def onfini(self, func):
'''
Add a function/coroutine/Base to be called on fini().
The rules around how to register function/coroutine/Base to be called:
- Call this method with an instance of Base (this class) if holding
a reference to a bound method of the instance (such as a fini()
method) would cause the object to be leaked. This is appropriate
for ephemeral objects that may be constructed/destroyed multiple
times over the lifetime of a process.
- Call this method with an instance method if you want the object to
have a lifetime as long as the thing being fini'd.
'''
if self.isfini:
if isinstance(func, Base):
s_coro.create_task(func.fini())
else:
s_coro.create_task(s_coro.ornot(func))
return
if isinstance(func, Base):
self.tofini.add(func)
return
assert self.anitted
self._fini_funcs.append(func)
async def __aenter__(self):
assert asyncio.get_running_loop() == self.loop
return self
async def __aexit__(self, exc, cls, tb):
# Either there should be no running loop or we shall be on the right one
try:
assert asyncio.get_running_loop() == self.loop
except RuntimeError:
pass
await self.fini()
[docs]
def incref(self):
'''
Increment the reference count for this base. This API may be optionally used to control fini().
'''
self._syn_refs += 1
return self._syn_refs
[docs]
def link(self, func):
'''
Add a callback function to receive *all* events.
Example:
base1 = Base()
base2 = Base()
base1.link( base2.dist )
# all events on base1 are also propagated on base2
'''
self._syn_links.append(func)
[docs]
def unlink(self, func):
'''
Remove a callback function previously added with link()
Example:
base.unlink( callback )
'''
if func in self._syn_links:
self._syn_links.remove(func)
[docs]
def on(self, evnt, func, base=None):
'''
Add an base function callback for a specific event with optional filtering. If the function returns a
coroutine, it will be awaited.
Args:
evnt (str): An event name
func (function): A callback function to receive event tufo
Examples:
Add a callback function and fire it:
async def baz(event):
x = event[1].get('x')
y = event[1].get('y')
return x + y
d.on('foo', baz)
# this fire triggers baz...
await d.fire('foo', x=10, y=20)
Returns:
None:
'''
funcs = self._syn_funcs[evnt]
if func in funcs:
return
funcs.append(func)
if base is not None:
def fini():
self.off(evnt, func)
base.onfini(fini)
[docs]
def off(self, evnt, func):
'''
Remove a previously registered event handler function.
Example:
base.off( 'foo', onFooFunc )
'''
funcs = self._syn_funcs.get(evnt)
if funcs is None:
return
try:
funcs.remove(func)
except ValueError:
pass
[docs]
async def fire(self, evtname, **info):
'''
Fire the given event name on the Base.
Returns a list of the return values of each callback.
Example:
for ret in d.fire('woot',foo='asdf'):
print('got: %r' % (ret,))
'''
event = (evtname, info)
if self.isfini:
return event
await self.dist(event)
return event
[docs]
async def dist(self, mesg):
'''
Distribute an existing event tuple.
Args:
mesg ((str,dict)): An event tuple.
Example:
await base.dist( ('foo',{'bar':'baz'}) )
'''
if self.isfini:
return ()
ret = []
for func in self._syn_funcs.get(mesg[0], ()):
try:
ret.append(await s_coro.ornot(func, mesg))
except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only
raise
except Exception:
logger.exception('base %s error with mesg %s', self, mesg)
for func in self._syn_links:
try:
ret.append(await s_coro.ornot(func, mesg))
except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only
raise
except Exception:
logger.exception('base %s error with mesg %s', self, mesg)
return ret
async def _kill_active_tasks(self):
if not self._active_tasks:
return
for task in self._active_tasks.copy():
task.cancel()
try:
await task
except (asyncio.CancelledError, Exception):
# The taskDone callback will emit the exception. No need to repeat
pass
[docs]
async def fini(self):
'''
Shut down the object and notify any onfini() coroutines.
Returns:
Remaining ref count
'''
assert self.anitted, f'{self.__class__.__name__} initialized improperly. Must use Base.anit class method.'
if self.isfini:
return
if __debug__:
import synapse.lib.threads as s_threads # avoid import cycle
assert s_threads.iden() == self.tid
self._syn_refs -= 1
if self._syn_refs > 0:
return self._syn_refs
self.isfini = True
for base in list(self.tofini):
await base.fini()
# Do not continue to hold a reference to the last item we iterated on.
base = None # NOQA
await self._kill_active_tasks()
if self._context_managers is not None:
for item in reversed(self._context_managers):
exit = getattr(item, '__aexit__', None)
if exit is not None:
try:
await exit(None, None, None)
except Exception:
logger.exception(f'{self} {item} - context aexit failed!')
continue
exit = getattr(item, '__exit__', None)
if exit is not None:
try:
exit(None, None, None)
except Exception:
logger.exception(f'{self} {item} - context exit failed!')
continue
for fini in self._fini_funcs:
try:
await s_coro.ornot(fini)
except Exception:
logger.exception(f'{self} - fini function failed: {fini}')
self._syn_funcs.clear()
self._fini_funcs.clear()
self.finievt.set()
return 0
[docs]
@contextlib.contextmanager
def onWith(self, evnt, func):
'''
A context manager which can be used to add a callback and remove it when
using a ``with`` statement.
Args:
evnt (str): An event name
func (function): A callback function to receive event tufo
'''
self.on(evnt, func)
# Allow exceptions to propagate during the context manager
# but ensure we cleanup our temporary callback
try:
yield self
finally:
self.off(evnt, func)
def _wouldfini(self):
'''Check if a Base would be fini() if fini() was called on it.'''
return self._syn_refs == 1
[docs]
async def waitfini(self, timeout=None):
'''
Wait for the base to fini()
Returns:
None if timed out, True if fini happened
Example:
base.waitfini(timeout=30)
'''
return await s_coro.event_wait(self.finievt, timeout)
[docs]
def schedCoro(self, coro):
'''
Schedules a free-running coroutine to run on this base's event loop.
It does not pend on coroutine completion.
Args:
coro: The coroutine to schedule.
Notes:
This function is *not* threadsafe and must be run on the Base's event loop.
Tasks created by this function do inherit the synapse.lib.scope Scope from the current task.
Raises IsFini if the Base has already been fini'd.
Returns:
asyncio.Task: An asyncio.Task object.
'''
if __debug__:
assert inspect.isawaitable(coro)
import synapse.lib.threads as s_threads # avoid import cycle
assert s_threads.iden() == self.tid
if self.isfini:
coro.close()
raise s_exc.IsFini(mesg=f'Cannot schedCoro on a fini Base: {self}')
if self._active_tasks is None:
self._active_tasks = set()
task = self.loop.create_task(coro)
s_scope.clone(task)
def taskDone(task):
self._active_tasks.remove(task)
try:
task.result()
except asyncio.CancelledError:
pass
except Exception:
logger.exception('Task %s scheduled through Base.schedCoro raised exception', task)
self._active_tasks.add(task)
task.add_done_callback(taskDone)
return task
[docs]
def schedCallSafe(self, func, *args, **kwargs):
'''
Schedule a function to run as soon as possible on the same event loop that this Base is running on.
This function does *not* pend on the function completion.
Args:
func:
*args:
**kwargs:
Notes:
This method may be called from outside of the event loop on a different thread.
This function will break any task scoping done with synapse.lib.scope.
Returns:
concurrent.futures.Future: A Future representing the eventual function execution.
'''
def real():
return func(*args, **kwargs)
return self.loop.call_soon_threadsafe(real)
[docs]
def schedCoroSafe(self, coro):
'''
Schedules a coroutine to run as soon as possible on the same event loop that this Base is running on.
This function does *not* pend on coroutine completion.
Notes:
This method may be run outside the event loop on a different thread.
This function will break any task scoping done with synapse.lib.scope.
If the Base is fini'd before or at the time of dispatch, the coroutine is closed.
Returns:
concurrent.futures.Future: A Future representing the eventual coroutine execution.
'''
def _safecall():
if self.isfini:
coro.close()
return
self.schedCoro(coro)
return self.loop.call_soon_threadsafe(_safecall)
[docs]
def schedCoroSafePend(self, coro):
'''
Schedules a coroutine to run as soon as possible on the same event loop that this Base is running on
Note:
This method may *not* be run inside an event loop
'''
if __debug__:
import synapse.lib.threads as s_threads # avoid import cycle
assert s_threads.iden() != self.tid
task = asyncio.run_coroutine_threadsafe(coro, self.loop)
return task.result()
[docs]
async def addSignalHandlers(self):
'''
Register SIGTERM/SIGINT signal handlers with the ioloop to fini this object.
'''
if self._syn_signal_tasks is None:
self._syn_signal_tasks = set()
def sigterm():
logger.warning('Caught SIGTERM, shutting down.')
task = asyncio.create_task(self.fini())
self._syn_signal_tasks.add(task)
task.add_done_callback(self._syn_signal_tasks.discard)
def sigint():
logger.warning('Caught SIGINT, shutting down.')
task = asyncio.create_task(self.fini())
self._syn_signal_tasks.add(task)
task.add_done_callback(self._syn_signal_tasks.discard)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, sigint)
loop.add_signal_handler(signal.SIGTERM, sigterm)
[docs]
async def main(self, timeout=BASE_MAIN_BG_TASK_TIMEOUT): # pragma: no cover
'''
Helper function to setup signal handlers for this base as the main object.
( use base.waitfini() to block )
Note:
This API may only be used when the ioloop is *also* the main thread.
'''
await self.addSignalHandlers()
await self.waitfini()
# shutdown logging to allow it to drain any queued messages it has, swapping in a stream handler,
# and then cancellling the task so we do not have to await the pump task in bg tasks.
await s_logging.shutdown()
await s_coro.await_bg_tasks(timeout)
[docs]
def waiter(self, count, *names, timeout=None):
'''
Construct and return a new Waiter for events on this base.
Example:
# wait up to 3 seconds for 10 foo:bar events...
waiter = base.waiter(10,'foo:bar')
# .. fire task that will cause foo:bar events
events = await waiter.wait(timeout=3)
if events == None:
# handle the timeout case...
for event in events:
# parse the events if you need...
Note:
Use this with caution. It's easy to accidentally construct
race conditions with this mechanism ;)
'''
return Waiter(self, count, *names, timeout=timeout)
[docs]
class Waiter:
'''
A helper to wait for a given number of events on a Base.
'''
def __init__(self, base, count, *names, timeout=None):
self.base = base
self.names = names
self.count = count
self.timeout = timeout
self.event = asyncio.Event()
self.events = []
for name in names:
base.on(name, self._onWaitEvent)
if not names:
base.link(self._onWaitEvent)
def _onWaitEvent(self, mesg):
self.events.append(mesg)
if len(self.events) >= self.count:
self.event.set()
[docs]
async def wait(self, timeout=None):
'''
Wait for the required number of events and return them or None on timeout.
Example:
evnts = waiter.wait(timeout=30)
if evnts == None:
handleTimedOut()
return
for evnt in evnts:
doStuff(evnt)
'''
if timeout is None:
timeout = self.timeout
try:
retn = await s_coro.event_wait(self.event, timeout)
if not retn:
return None
return self.events
finally:
self.fini()
[docs]
def fini(self):
for name in self.names:
self.base.off(name, self._onWaitEvent)
if not self.names:
self.base.unlink(self._onWaitEvent)
del self.event
async def __aenter__(self):
return self
async def __aexit__(self, exc, cls, tb):
if exc is None:
if await self.wait() is None: # pragma: no cover
# these lines are 100% covered by the tests but
# the coverage plugin cannot seem to see them...
events = ','.join(self.names)
mesg = f'timeout waiting for {self.count} event(s): {events}'
raise s_exc.TimeOut(mesg=mesg)
[docs]
class BaseRef(Base):
'''
An object for managing multiple Base instances by name.
'''
async def __anit__(self, ctor=None):
await Base.__anit__(self)
self.ctor = ctor
self.base_by_name = {}
self.onfini(self._onBaseRefFini)
async def _onBaseRefFini(self):
await asyncio.gather(*[base.fini() for base in self.vals()])
[docs]
def put(self, name, base):
'''
Add a Base (or sub-class) to the BaseRef by name.
Args:
name (str): The name/iden of the Base
base (Base): The Base instance
Returns:
(None)
'''
async def fini():
if self.base_by_name.get(name) is base:
self.base_by_name.pop(name, None)
# Remove myself from BaseRef when I fini
base.onfini(fini)
self.base_by_name[name] = base
[docs]
def pop(self, name):
'''
Remove and return a Base from the BaseRef.
Args:
name (str): The name/iden of the Base instance
Returns:
(Base): The named base ( or None )
'''
return self.base_by_name.pop(name, None)
[docs]
def get(self, name):
'''
Retrieve a Base instance by name.
Args:
name (str): The name/iden of the Base
Returns:
(Base): The Base instance (or None)
'''
return self.base_by_name.get(name)
[docs]
async def gen(self, name):
'''
Atomically get/gen a Base and incref.
(requires ctor during BaseRef init)
Args:
name (str): The name/iden of the Base instance.
'''
if self.ctor is None:
raise s_exc.NoSuchCtor(name=name, mesg='BaseRef.gen() requires ctor')
base = self.base_by_name.get(name)
if base is None:
base = await self.ctor(name)
self.put(name, base)
else:
base.incref()
return base
[docs]
def vals(self):
return list(self.base_by_name.values())
[docs]
def items(self):
return list(self.base_by_name.items())
def __iter__(self):
# make a copy during iteration to prevent dict
# change during iteration exceptions
return iter(list(self.base_by_name.values()))
[docs]
async def schedGenr(genr, maxsize=100):
'''
Schedule a generator to run on a separate task and yield results to this task (pipelined generator).
'''
q = asyncio.Queue(maxsize=maxsize)
async def genrtask(base):
try:
async for item in genr:
await q.put((True, item))
await q.put((False, None))
except Exception:
if not base.isfini:
await q.put((False, None))
raise
async with await Base.anit() as base:
task = base.schedCoro(genrtask(base))
while not base.isfini:
ok, retn = await q.get()
if ok:
yield retn
# since we are a pipeline, yield every time...
await asyncio.sleep(0)
continue
await task
return
[docs]
async def main(coro): # pragma: no cover
base = await coro
if isinstance(base, Base):
async with base:
await base.main()