Source code for synapse.lib.spawner

import asyncio
import logging
import tempfile

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.daemon as s_daemon
import synapse.telepath as s_telepath

import synapse.lib.base as s_base
import synapse.lib.link as s_link
import synapse.lib.const as s_const
import synapse.lib.process as s_process

logger = logging.getLogger(__name__)

def _ioWorkProc(todo, sockpath):

    async def workloop():

        async with await s_daemon.Daemon.anit() as dmon:

            func, args, kwargs = todo

            async with await func(*args, **kwargs) as item:

                assert isinstance(item, s_base.Base), f'Item is not a Base object: {item}'

                await item.addSignalHandlers()

                dmon.share('dmon', dmon)
                dmon.share('item', item)

                # bind last so we're ready to go...
                try:
                    await dmon.listen(f'unix://{sockpath}')
                except OSError:
                    logger.exception(f'IO worker failed to open listening socket at [{sockpath}]')
                    raise
                await item.waitfini()

    asyncio.run(workloop())

async def _spawnerWait(sockpath, timeout=30):

    loop = asyncio.get_running_loop()
    deadline = loop.time() + timeout
    while True:
        try:
            await asyncio.wait_for(s_link.unixwait(sockpath), timeout=1)
            return
        except asyncio.TimeoutError:
            if loop.time() > deadline:
                raise s_exc.FatalErr(
                    mesg=f'IO worker failed to open listening socket at [{sockpath}] within {timeout}s'
                )

[docs] class SpawnerMixin:
[docs] @classmethod def spawner(cls, base=None, sockpath=None): async def _spawn(*args, **kwargs): return await cls._spawn(args, kwargs, base=base, sockpath=sockpath) return _spawn
@classmethod async def _spawn(cls, args, kwargs, base=None, sockpath=None): todo = (cls.anit, args, kwargs) iden = s_common.guid() if sockpath is not None and len(sockpath) > s_const.UNIX_SOCKET_PATH_MAX: sockpath = None if sockpath is None: tmpdir = tempfile.gettempdir() sockpath = s_common.genpath(tmpdir, iden) if base is None: base = await s_base.Base.anit() base.schedCoro(s_process.spawn((_ioWorkProc, (todo, sockpath), {}))) await _spawnerWait(sockpath) proxy = await s_telepath.openurl(f'unix://{sockpath}:item') async def fini(): try: async with await s_telepath.openurl(f'unix://{sockpath}:item') as finiproxy: await finiproxy.task(('fini', (), {})) except (s_exc.LinkErr, s_exc.NoSuchPath, asyncio.CancelledError): # This can fail if the subprocess was terminated from outside... pass if not base.isfini: logger.error(f'IO Worker Socket Closed: {sockpath}') await base.fini() proxy.onfini(fini) base.onfini(proxy) return proxy