import copy
import asyncio
import logging
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.base as s_base
logger = logging.getLogger(__name__)
[docs]
class Task(s_base.Base):
'''
The synapse Task object implements concepts similar to process trees
for asyncio.Task instances.
'''
async def __anit__(self, boss, task, name, user, info=None, root=None, iden=None):
await s_base.Base.__anit__(self)
if info is None:
info = {}
self.boss = boss
task._syn_task = self
if iden is not None:
if not isinstance(iden, str):
mesg = 'The task iden specified must be a string.'
raise s_exc.BadArg(mesg=mesg)
if not s_common.isguid(iden):
mesg = 'The task iden specified must match the regular expression: ^[a-f0-9]{32}$'
raise s_exc.BadArg(mesg=mesg)
self.iden = iden
if self.iden is None:
self.iden = s_common.guid()
self.task = task # the real task...
self.tick = s_common.now()
self.name = name
self.user = user
self.root = root
self.info = info
self.kids = {}
if self.boss.tasks.get(self.iden) is not None:
mesg = 'Specified task iden already exists!'
raise s_exc.BadArg(mesg=mesg, iden=iden)
self.boss.tasks[self.iden] = self
if root is not None:
root.kids[self.iden] = self
self.task.add_done_callback(self._onTaskDone)
self.onfini(self._onTaskFini)
def __repr__(self):
user = 'root'
if self.user is not None:
user = self.user.name
return 'task: %s (%s) %r' % (self.iden, user, self.info)
def _onTaskDone(self, t):
if not self.isfini:
self.boss.schedCoroSafe(self.fini())
async def _onTaskFini(self):
for task in list(self.kids.values()):
await task.fini()
self.task.cancel()
try:
await self.task
except asyncio.CancelledError:
pass
except Exception: # pragma: no cover
logger.exception(f'Task {self.name} completed with exception')
if self.root is not None:
self.root.kids.pop(self.iden)
self.boss.tasks.pop(self.iden)
[docs]
async def worker(self, coro, name='worker'):
task = self.boss.schedCoro(coro)
synt = await Task.anit(self.boss, task, name, self.user, root=self)
self.kids[synt.iden] = synt
[docs]
async def kill(self):
# task kill and fini are the same...
await self.fini()
[docs]
def pack(self):
pask = {
'iden': self.iden,
'name': self.name,
'info': copy.deepcopy(self.info),
'tick': self.tick,
'user': 'root',
'kids': {i: k.pack() for i, k in self.kids.items()},
}
if self.user is not None:
pask['user'] = self.user.name
return pask
[docs]
def loop():
try:
return asyncio.get_running_loop()
except Exception:
return None
[docs]
def current():
'''
Return the current synapse task.
'''
task = asyncio.current_task()
return getattr(task, '_syn_task', None)
[docs]
def user():
'''
Return the current task user.
'''
task = current()
if task is not None:
return task.user
[docs]
def username():
'''
Return the current task user name.
'''
item = user()
if item is not None:
return item.name
[docs]
async def executor(func, *args, **kwargs):
'''
Execute a function in an executor thread.
Args:
todo ((func,args,kwargs)): A todo tuple.
'''
def syncfunc():
return func(*args, **kwargs)
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, syncfunc)
# Task vars: task-local variables
_TaskDictCtors = {} # type: ignore
[docs]
def varinit(task=None):
'''
Initializes (or re-initializes for testing purposes) all of a task's task-local variables
Precondition:
If task is None, this must be called from task context
'''
if task is None:
task = asyncio.current_task()
taskvars = {}
task._syn_taskvars = taskvars
return taskvars
def _taskdict(task):
'''
Note: No locking is provided. Under normal circumstances, like the other task is not running (e.g. this is running
from the same event loop as the task) or task is the current task, this is fine.
'''
if task is None:
task = asyncio.current_task()
assert task
taskvars = getattr(task, '_syn_taskvars', None)
if taskvars is None:
taskvars = varinit(task)
return taskvars
[docs]
def varget(name, defval=None, task=None):
'''
Access a task local variable by name
Precondition:
If task is None, this must be called from task context
'''
taskdict = _taskdict(task)
retn = taskdict.get(name, s_common.NoValu)
if retn is not s_common.NoValu:
return retn
func = _TaskDictCtors.get(name)
if func is None:
return defval
item = func()
taskdict[name] = item
return item
[docs]
def varset(name, valu, task=None):
'''
Set a task-local variable
Args:
task: If task is None, uses current task
Precondition:
If task is None, this must be called from task context
'''
_taskdict(task)[name] = valu
[docs]
def vardefault(name, func):
'''
Add a default constructor for a particular task-local variable
All future calls to taskVarGet with the same name will return the result of calling func
'''
_TaskDictCtors[name] = func