'''
Process pool related utilities.
Importing this module has the side effect of creating a forkserver and should be done
early in the creation of a process which needs it.
'''
import os
import atexit
import asyncio
import logging
import multiprocessing
import concurrent.futures
logger = logging.getLogger(__name__)
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.process as s_process
forkpool = None
forkpool_sema = None
max_workers = None
def_max_workers = 8
reserved_workers = 2
if multiprocessing.current_process().name == 'MainProcess':
# only create the forkpools in the MainProcess...
try:
mpctx = multiprocessing.get_context('forkserver')
max_workers = int(os.getenv('SYN_FORKED_WORKERS', 0)) or max(def_max_workers, os.cpu_count() or def_max_workers)
forkpool = concurrent.futures.ProcessPoolExecutor(mp_context=mpctx, max_workers=max_workers)
atexit.register(forkpool.shutdown)
forkpool_sema = asyncio.Semaphore(max(1, max_workers - reserved_workers))
except OSError as e: # pragma: no cover
max_workers = None
logger.warning(f'Failed to init forkserver pool, fallback enabled: {e}', exc_info=True)
def _runtodo(todo): # pragma: no cover
return todo[0](*todo[1], **todo[2])
def _init_pool_worker(logger_, logconf): # prama: no cover
s_common.setlogging(logger_, **logconf)
p = multiprocessing.current_process()
logger.debug(f'Initialized new forkserver pool worker: name={p.name} pid={p.ident}')
_pool_logconf = None
[docs]
def set_pool_logging(logger_, logconf):
# This must be called before any calls to forked() and _parserforked()
global _pool_logconf
_pool_logconf = logconf
todo = s_common.todo(_init_pool_worker, logger_, logconf)
if forkpool is not None:
forkpool._initializer = _runtodo
forkpool._initargs = (todo,)
[docs]
async def forked(func, *args, **kwargs):
'''
Execute a target function in the shared forked process pool
and fallback to running in a spawned process if the pool is unavailable.
Args:
func: The target function.
*args: Function positional arguments.
**kwargs: Function keyword arguments.
Returns:
The target function return.
'''
todo = (func, args, kwargs)
if forkpool is not None:
try:
return await asyncio.get_running_loop().run_in_executor(forkpool, _runtodo, todo)
except concurrent.futures.process.BrokenProcessPool as e: # pragma: no cover
logger.exception(f'Shared forkserver pool is broken, fallback enabled: {func}')
logger.debug(f'Forkserver pool using spawn fallback: {func}')
return await s_process.spawn(todo, log_conf=_pool_logconf)
[docs]
async def semafork(func, *args, **kwargs):
'''
Execute a target function in the shared forked process pool
gated by a semaphore to ensure there are workers reserved for the Storm parser.
Args:
func: The target function.
*args: Function positional arguments.
**kwargs: Function keyword arguments.
Returns:
The target function return.
'''
if forkpool_sema is None:
return await forked(func, *args, **kwargs)
async with forkpool_sema:
return await forked(func, *args, **kwargs)
async def _parserforked(func, *args, **kwargs):
'''
Execute a target function in the shared forked process pool
and fallback to running in the default executor if the pool is unavailable.
NOTE: This function is intended to only be used by the Storm parser
Args:
func: The target function.
*args: Function positional arguments.
**kwargs: Function keyword arguments.
Returns:
The target function return.
Raises:
The function may raise from the target function, or raise an s_exc.FatalErr in the event of a broken forked
process pool. The fatalerr represents an unrecoverable application state.
'''
todo = (func, args, kwargs)
try:
return await asyncio.get_running_loop().run_in_executor(forkpool, _runtodo, todo)
except concurrent.futures.process.BrokenProcessPool as e: # pragma: no cover
logger.exception(f'Fatal error executing forked task: {func} {args} {kwargs}')
raise s_exc.FatalErr(mesg=f'Fatal error encountered: {e}') from None