Source code for synapse.lib.process

'''
Process spawning utilities.
'''
import queue
import asyncio
import logging
import multiprocessing

logger = logging.getLogger(__name__)

import synapse.exc as s_exc
import synapse.common as s_common

import synapse.lib.coro as s_coro

def _exectodo(que, todo, logconf):
    # This is a new process: configure logging
    s_common.setlogging(logger, **logconf)
    func, args, kwargs = todo
    try:
        ret = func(*args, **kwargs)
        que.put(ret)
    except s_exc.SynErr as e:
        logger.exception(f'Error executing spawn function {func}')
        que.put(e)
    except Exception as e:
        # exceptions could be non-pickleable so wrap in SynErr
        logger.exception(f'Error executing spawn function {func}')
        name, info = s_common.err(e)
        mesg = f'Error executing spawn function: {name}: {info.get("mesg")}'
        exc = s_exc.SynErr(mesg=mesg, name=name, info=info)
        que.put(exc)

[docs] async def spawn(todo, timeout=None, ctx=None, log_conf=None): ''' Run a todo (func, args, kwargs) tuple in a multiprocessing subprocess. Args: todo (tuple): A tuple of function, ``*args``, and ``**kwargs``. timeout (int): The timeout to wait for the todo function to finish. ctx (multiprocess.Context): A optional multiprocessing context object. log_conf (dict): An optional logging configuration for the spawned process. Notes: The contents of the todo tuple must be able to be pickled for execution. This means that locally bound functions are not eligible targets for spawn. Returns: The return value of executing the todo function. ''' if ctx is None: ctx = multiprocessing.get_context('spawn') if log_conf is None: log_conf = {} que = ctx.Queue() proc = ctx.Process(target=_exectodo, args=(que, todo, log_conf)) def execspawn(): proc.start() while True: try: # we have to block/wait on the queue because the sender # could need to stream the return value in multiple chunks retn = que.get(timeout=1) # now that we've retrieved the response, it should have exited. proc.join() return retn except queue.Empty: if not proc.is_alive(): proc.join() mesg = f'Spawned process {proc} exited for {todo[0]} without a result.' raise s_exc.SpawnExit(mesg=mesg, code=proc.exitcode) try: coro = s_coro.executor(execspawn) retn = await s_common.wait_for(coro, timeout=timeout) if isinstance(retn, Exception): raise retn return retn except (asyncio.CancelledError, asyncio.TimeoutError): proc.terminate() raise