import asyncio
import logging
import synapse.exc as s_exc
import synapse.lib.base as s_base
import synapse.lib.coro as s_coro
import synapse.lib.task as s_task
logger = logging.getLogger(__name__)
[docs]
class Boss(s_base.Base):
'''
An object to track "promoted" async tasks.
Promoted tasks are asyncio tasks, wrapped in a synapse task
(``s_task.Task``), that are visible to storm users via the task tracking
libs/commands such as ``ps.list`` and ``$lib.ps.list()``.
'''
async def __anit__(self):
await s_base.Base.__anit__(self)
self.tasks = {}
self.is_shutdown = False
self.shutdown_lock = asyncio.Lock()
self.onfini(self._onBossFini)
[docs]
async def shutdown(self, timeout=None, drain=True):
'''
Initiate a shutdown of the Boss by stopping promotion of any new tasks
and either awaiting or cancelling top-level promoted tasks.
Background tasks and child tasks are not awaited or cancelled.
Args:
timeout: Optional total timeout in seconds for reaping tasks.
The timeout is shared across all tasks; if it is reached
before every task is reaped the shutdown is aborted. ``None``
blocks indefinitely.
drain: If True (the default), top-level tasks are awaited until
they complete on their own. If False, top-level tasks are
cancelled before being awaited.
Returns:
bool: True if all eligible tasks were reaped before the timeout
was reached; False otherwise.
'''
self.reqNotShut()
async with self.shutdown_lock:
toplevel = [task for task in self.tasks.values()
if task.root is None and not task.background]
if not drain:
for task in toplevel:
task.task.cancel()
remaining = s_coro.deadline(timeout)
for task in toplevel:
if not await s_coro.waittask(task.task, timeout=remaining()):
return False
self.is_shutdown = True
return True
[docs]
def reqNotShut(self, mesg=None):
if self.shutdown_lock.locked():
if mesg is None:
mesg = 'The service is shutting down.'
raise s_exc.ShuttingDown(mesg=mesg)
if self.is_shutdown:
if mesg is None:
mesg = 'The service is shut down.'
raise s_exc.ShuttingDown(mesg=mesg)
async def _onBossFini(self):
for task in list(self.tasks.values()):
await task.kill(safe=False)
[docs]
def ps(self):
# top level tasks only...
return [t for t in self.tasks.values() if t.root is None]
[docs]
def get(self, iden):
return self.tasks.get(iden)
[docs]
async def execute(self, coro, name, user, info=None, iden=None):
'''
Create a synapse task from the given coroutine.
'''
self.reqNotShut()
task = self.schedCoro(coro)
return await s_task.Task.anit(self, task, name, user, info=info, iden=iden)