import gc
import os
import ssl
import copy
import time
import fcntl
import shutil
import signal
import socket
import asyncio
import logging
import tarfile
import argparse
import datetime
import platform
import tempfile
import functools
import contextlib
import multiprocessing
import aiohttp
import tornado.web as t_web
import tornado.log as t_log
import synapse.exc as s_exc
import synapse.data as s_data
import synapse.common as s_common
import synapse.daemon as s_daemon
import synapse.telepath as s_telepath
import synapse.lib.auth as s_auth
import synapse.lib.base as s_base
import synapse.lib.boss as s_boss
import synapse.lib.coro as s_coro
import synapse.lib.hive as s_hive
import synapse.lib.link as s_link
import synapse.lib.task as s_task
import synapse.lib.cache as s_cache
import synapse.lib.const as s_const
import synapse.lib.drive as s_drive
import synapse.lib.nexus as s_nexus
import synapse.lib.queue as s_queue
import synapse.lib.scope as s_scope
import synapse.lib.config as s_config
import synapse.lib.health as s_health
import synapse.lib.output as s_output
import synapse.lib.certdir as s_certdir
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.httpapi as s_httpapi
import synapse.lib.msgpack as s_msgpack
import synapse.lib.schemas as s_schemas
import synapse.lib.spooled as s_spooled
import synapse.lib.urlhelp as s_urlhelp
import synapse.lib.version as s_version
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.lib.thisplat as s_thisplat
import synapse.lib.crypto.passwd as s_passwd
import synapse.tools.backup as s_t_backup
logger = logging.getLogger(__name__)
NEXUS_VERSION = (2, 177)
SLAB_MAP_SIZE = 128 * s_const.mebibyte
SSLCTX_CACHE_SIZE = 64
'''
Base classes for the synapse "cell" microservice architecture.
'''
PERM_DENY = 0
PERM_READ = 1
PERM_EDIT = 2
PERM_ADMIN = 3
permnames = {
PERM_DENY: 'deny',
PERM_READ: 'read',
PERM_EDIT: 'edit',
PERM_ADMIN: 'admin',
}
diskspace = "Insufficient free space on disk."
[docs]
def adminapi(log=False):
'''
Decorator for CellApi (and subclasses) for requiring a method to be called only by an admin user.
Args:
log (bool): If set to True, log the user, function and arguments.
'''
def decrfunc(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
if args[0].user is not None and not args[0].user.isAdmin():
raise s_exc.AuthDeny(mesg='User is not an admin.',
user=args[0].user.name)
if log:
logger.info('Executing [%s] as [%s] with args [%s][%s]',
func.__qualname__, args[0].user.name, args[1:], kwargs)
return func(*args, **kwargs)
wrapped.__syn_wrapped__ = 'adminapi'
return wrapped
return decrfunc
[docs]
def from_leader(func):
'''
Decorator used to indicate that the decorated method must call up to the
leader to perform it's work.
This only works on Cell classes and subclasses. The decorated method name
MUST be the same as the telepath API name.
'''
@functools.wraps(func)
async def wrapper(self, *args, **kwargs):
if not self.isactive:
proxy = await self.nexsroot.client.proxy()
api = getattr(proxy, func.__name__)
return await api(*args, **kwargs)
return await func(self, *args, **kwargs)
return wrapper
async def _doIterBackup(path, chunksize=1024):
'''
Create tarball and stream bytes.
Args:
path (str): Path to the backup.
Yields:
(bytes): File bytes
'''
output_filename = path + '.tar.gz'
link0, file1 = await s_link.linkfile()
def dowrite(fd):
# TODO: When we are 3.12+ convert this back to w|gz - see https://github.com/python/cpython/pull/2962
with tarfile.open(output_filename, 'w:gz', fileobj=fd, compresslevel=1) as tar:
tar.add(path, arcname=os.path.basename(path))
fd.close()
coro = s_coro.executor(dowrite, file1)
while True:
byts = await link0.recv(chunksize)
if not byts:
break
yield byts
await coro
await link0.fini()
async def _iterBackupWork(path, linkinfo):
'''
Inner setup for backup streaming.
Args:
path (str): Path to the backup.
linkinfo(dict): Link info dictionary.
Returns:
None: Returns None.
'''
logger.info(f'Getting backup streaming link for [{path}].')
link = await s_link.fromspawn(linkinfo)
await s_daemon.t2call(link, _doIterBackup, (path,), {})
await link.fini()
logger.info(f'Backup streaming for [{path}] completed.')
def _iterBackupProc(path, linkinfo):
'''
Multiprocessing target for streaming a backup.
'''
# This logging call is okay to run since we're executing in
# our own process space and no logging has been configured.
s_common.setlogging(logger, **linkinfo.get('logconf'))
logger.info(f'Backup streaming process for [{path}] starting.')
asyncio.run(_iterBackupWork(path, linkinfo))
[docs]
class CellApi(s_base.Base):
async def __anit__(self, cell, link, user):
await s_base.Base.__anit__(self)
self.cell = cell
self.link = link
assert user
self.user = user
self.sess = self.link.get('sess') # type: s_daemon.Sess
self.sess.user = user
await self.initCellApi()
[docs]
async def initCellApi(self):
pass
[docs]
@adminapi(log=True)
async def freeze(self, timeout=30):
return await self.cell.freeze(timeout=timeout)
[docs]
@adminapi(log=True)
async def resume(self):
return await self.cell.resume()
[docs]
async def allowed(self, perm, default=None):
'''
Check if the user has the requested permission.
Args:
perm: permission path components to check
default: Value returned if no value stored
Examples:
Form a path and check the permission from a remote proxy::
perm = ('node', 'add', 'inet:ipv4')
allowed = await prox.allowed(perm)
if allowed:
dostuff()
Returns:
Optional[bool]: True if the user has permission, False if explicitly denied, None if no entry
'''
return self.user.allowed(perm, default=default)
async def _reqUserAllowed(self, perm):
'''
Helper method that subclasses can use for user permission checking.
Args:
perm: permission path components to check
Notes:
This can be used to require a permission; and will throw an exception if the permission is not allowed.
Examples:
Implement an API that requires a user to have a specific permission in order to execute it::
async def makeWidget(self, wvalu, wtype):
# This will throw if the user doesn't have the appropriate widget permission
await self._reqUserAllowed(('widget', wtype))
return await self.cell.makeWidget((wvalu, wtype))
Returns:
None: This API does not return anything. It only throws an exception on failure.
Raises:
s_exc.AuthDeny: If the permission is not allowed.
'''
if not await self.allowed(perm):
perm = '.'.join(perm)
mesg = f'User must have permission {perm}'
raise s_exc.AuthDeny(mesg=mesg, perm=perm, username=self.user.name, user=self.user.iden)
[docs]
def getCellType(self):
return self.cell.getCellType()
[docs]
def getCellIden(self):
return self.cell.getCellIden()
[docs]
async def getCellRunId(self):
return await self.cell.getCellRunId()
[docs]
async def isCellActive(self):
'''
Returns True if the cell is an active/leader cell.
'''
return await self.cell.isCellActive()
[docs]
def getPermDef(self, perm):
'''
Return a specific permission definition.
'''
return self.cell.getPermDef(perm)
[docs]
def getPermDefs(self):
'''
Return a non-comprehensive list of perm definitions.
'''
return self.cell.getPermDefs()
[docs]
@adminapi()
def getNexsIndx(self):
return self.cell.getNexsIndx()
[docs]
@adminapi()
async def readyToMirror(self):
return await self.cell.readyToMirror()
[docs]
@adminapi()
async def getMirrorUrls(self):
return await self.cell.getMirrorUrls()
[docs]
@adminapi(log=True)
async def cullNexsLog(self, offs):
'''
Remove Nexus log entries up to (and including) the given offset.
Note:
If there are consumers of this cell's nexus log they must
be caught up to at least the offs argument before culling.
Only rotated logs where the last index is less than the
provided offset will be removed from disk.
Args:
offs (int): The offset to remove entries up to.
Returns:
bool: Whether the cull was executed
'''
return await self.cell.cullNexsLog(offs)
[docs]
@adminapi(log=True)
async def rotateNexsLog(self):
'''
Rotate the Nexus log at the current offset.
Returns:
int: The starting index of the active Nexus log
'''
return await self.cell.rotateNexsLog()
[docs]
@adminapi(log=True)
async def trimNexsLog(self, consumers=None, timeout=60):
'''
Rotate and cull the Nexus log (and those of any consumers) at the current offset.
Note:
If the consumers argument is provided they will first be checked
if online before rotating and raise otherwise.
After rotation, all consumers must catch-up to the offset to cull
at before executing the cull, and will raise otherwise.
Args:
consumers (list or None): Optional list of telepath URLs for downstream Nexus log consumers.
timeout (int): Time in seconds to wait for downstream consumers to be caught up.
Returns:
int: The offset that the Nexus log was culled up to and including.
'''
return await self.cell.trimNexsLog(consumers=consumers, timeout=timeout)
[docs]
@adminapi()
async def waitNexsOffs(self, offs, timeout=None):
'''
Wait for the Nexus log to write an offset.
Args:
offs (int): The offset to wait for.
timeout (int or None): An optional timeout in seconds.
Returns:
bool: True if the offset was written, False if it timed out.
'''
return await self.cell.waitNexsOffs(offs, timeout=timeout)
[docs]
@adminapi(log=True)
async def handoff(self, turl, timeout=30):
return await self.cell.handoff(turl, timeout=timeout)
[docs]
def getCellUser(self):
return self.user.pack()
[docs]
async def getCellInfo(self):
return await self.cell.getCellInfo()
[docs]
@adminapi()
async def getSystemInfo(self):
'''
Get info about the system in which the cell is running
Returns:
A dictionary with the following keys. All size values are in bytes:
- volsize - Volume where cell is running total space
- volfree - Volume where cell is running free space
- backupvolsize - Backup directory volume total space
- backupvolfree - Backup directory volume free space
- celluptime - Cell uptime in milliseconds
- cellrealdisk - Cell's use of disk, equivalent to du
- cellapprdisk - Cell's apparent use of disk, equivalent to ls -l
- osversion - OS version/architecture
- pyversion - Python version
- totalmem - Total memory in the system
- availmem - Available memory in the system
'''
return await self.cell.getSystemInfo()
[docs]
def setCellUser(self, iden):
'''
Switch to another user (admin only).
This API allows remote admin/service accounts
to impersonate a user. Used mostly by services
that manage their own authentication/sessions.
'''
if not self.user.isAdmin():
mesg = 'setCellUser() caller must be admin.'
raise s_exc.AuthDeny(mesg=mesg, user=self.user.iden, username=self.user.name)
user = self.cell.auth.user(iden)
if user is None:
raise s_exc.NoSuchUser(mesg=f'Unable to set cell user iden to {iden}', user=iden)
if user.isLocked():
raise s_exc.AuthDeny(mesg=f'User ({user.name}) is locked.', user=user.iden, username=user.name)
self.user = user
self.link.get('sess').user = user
return True
[docs]
async def ps(self):
return await self.cell.ps(self.user)
[docs]
async def kill(self, iden):
return await self.cell.kill(self.user, iden)
[docs]
@adminapi(log=True)
async def behold(self):
'''
Yield Cell system messages
'''
async for mesg in self.cell.behold():
yield mesg
[docs]
@adminapi(log=True)
async def addUser(self, name, passwd=None, email=None, iden=None):
return await self.cell.addUser(name, passwd=passwd, email=email, iden=iden)
[docs]
@adminapi(log=True)
async def delUser(self, iden):
return await self.cell.delUser(iden)
[docs]
@adminapi(log=True)
async def addRole(self, name, iden=None):
return await self.cell.addRole(name, iden=iden)
[docs]
@adminapi(log=True)
async def delRole(self, iden):
return await self.cell.delRole(iden)
[docs]
@adminapi()
async def dyncall(self, iden, todo, gatekeys=()):
return await self.cell.dyncall(iden, todo, gatekeys=gatekeys)
[docs]
@adminapi()
async def dyniter(self, iden, todo, gatekeys=()):
async for item in self.cell.dyniter(iden, todo, gatekeys=gatekeys):
yield item
[docs]
@adminapi()
async def issue(self, nexsiden: str, event: str, args, kwargs, meta=None):
return await self.cell.nexsroot.issue(nexsiden, event, args, kwargs, meta)
[docs]
@adminapi(log=True)
async def delAuthUser(self, name):
await self.cell.auth.delUser(name)
[docs]
@adminapi(log=True)
async def addAuthRole(self, name):
role = await self.cell.auth.addRole(name)
return role.pack()
[docs]
@adminapi(log=True)
async def delAuthRole(self, name):
await self.cell.auth.delRole(name)
[docs]
@adminapi()
async def getAuthUsers(self, archived=False):
'''
Args:
archived (bool): If true, list all users, else list non-archived users
'''
return await self.cell.getAuthUsers(archived=archived)
[docs]
@adminapi()
async def getAuthRoles(self):
return await self.cell.getAuthRoles()
[docs]
@adminapi(log=True)
async def addUserRule(self, iden, rule, indx=None, gateiden=None):
return await self.cell.addUserRule(iden, rule, indx=indx, gateiden=gateiden)
[docs]
@adminapi(log=True)
async def setUserRules(self, iden, rules, gateiden=None):
return await self.cell.setUserRules(iden, rules, gateiden=gateiden)
[docs]
@adminapi(log=True)
async def setRoleRules(self, iden, rules, gateiden=None):
return await self.cell.setRoleRules(iden, rules, gateiden=gateiden)
[docs]
@adminapi(log=True)
async def addRoleRule(self, iden, rule, indx=None, gateiden=None):
return await self.cell.addRoleRule(iden, rule, indx=indx, gateiden=gateiden)
[docs]
@adminapi(log=True)
async def delUserRule(self, iden, rule, gateiden=None):
return await self.cell.delUserRule(iden, rule, gateiden=gateiden)
[docs]
@adminapi(log=True)
async def delRoleRule(self, iden, rule, gateiden=None):
return await self.cell.delRoleRule(iden, rule, gateiden=gateiden)
[docs]
@adminapi(log=True)
async def setUserAdmin(self, iden, admin, gateiden=None):
return await self.cell.setUserAdmin(iden, admin, gateiden=gateiden)
[docs]
@adminapi()
async def getAuthInfo(self, name):
'''This API is deprecated.'''
s_common.deprecated('CellApi.getAuthInfo')
user = await self.cell.auth.getUserByName(name)
if user is not None:
info = user.pack()
info['roles'] = [self.cell.auth.role(r).name for r in info['roles']]
return info
role = await self.cell.auth.getRoleByName(name)
if role is not None:
return role.pack()
raise s_exc.NoSuchName(name=name)
[docs]
@adminapi(log=True)
async def addAuthRule(self, name, rule, indx=None, gateiden=None):
'''This API is deprecated.'''
s_common.deprecated('CellApi.addAuthRule')
item = await self.cell.auth.getUserByName(name)
if item is None:
item = await self.cell.auth.getRoleByName(name)
await item.addRule(rule, indx=indx, gateiden=gateiden)
[docs]
@adminapi(log=True)
async def delAuthRule(self, name, rule, gateiden=None):
'''This API is deprecated.'''
s_common.deprecated('CellApi.delAuthRule')
item = await self.cell.auth.getUserByName(name)
if item is None:
item = await self.cell.auth.getRoleByName(name)
await item.delRule(rule, gateiden=gateiden)
[docs]
@adminapi(log=True)
async def setAuthAdmin(self, name, isadmin):
'''This API is deprecated.'''
s_common.deprecated('CellApi.setAuthAdmin')
item = await self.cell.auth.getUserByName(name)
if item is None:
item = await self.cell.auth.getRoleByName(name)
await item.setAdmin(isadmin)
[docs]
async def setUserPasswd(self, iden, passwd):
await self.cell.auth.reqUser(iden)
if self.user.iden == iden:
self.user.confirm(('auth', 'self', 'set', 'passwd'), default=True)
return await self.cell.setUserPasswd(iden, passwd)
self.user.confirm(('auth', 'user', 'set', 'passwd'))
return await self.cell.setUserPasswd(iden, passwd)
[docs]
@adminapi()
async def genUserOnepass(self, iden, duration=60000):
return await self.cell.genUserOnepass(iden, duration)
[docs]
@adminapi(log=True)
async def setUserLocked(self, useriden, locked):
return await self.cell.setUserLocked(useriden, locked)
[docs]
@adminapi(log=True)
async def setUserArchived(self, useriden, archived):
return await self.cell.setUserArchived(useriden, archived)
[docs]
@adminapi(log=True)
async def setUserEmail(self, useriden, email):
return await self.cell.setUserEmail(useriden, email)
[docs]
@adminapi(log=True)
async def addUserRole(self, useriden, roleiden, indx=None):
return await self.cell.addUserRole(useriden, roleiden, indx=indx)
[docs]
@adminapi(log=True)
async def setUserRoles(self, useriden, roleidens):
return await self.cell.setUserRoles(useriden, roleidens)
[docs]
@adminapi(log=True)
async def delUserRole(self, useriden, roleiden):
return await self.cell.delUserRole(useriden, roleiden)
[docs]
async def getUserInfo(self, name):
user = await self.cell.auth.reqUserByName(name)
if self.user.isAdmin() or self.user.iden == user.iden:
info = user.pack()
info['roles'] = [self.cell.auth.role(r).name for r in info['roles']]
return info
mesg = 'getUserInfo denied for non-admin and non-self'
raise s_exc.AuthDeny(mesg=mesg, user=self.user.iden, username=self.user.name)
[docs]
async def getRoleInfo(self, name):
role = await self.cell.auth.reqRoleByName(name)
if self.user.isAdmin() or role.iden in self.user.info.get('roles', ()):
return role.pack()
mesg = 'getRoleInfo denied for non-admin and non-member'
raise s_exc.AuthDeny(mesg=mesg, user=self.user.iden, username=self.user.name)
[docs]
@adminapi()
async def getUserDef(self, iden, packroles=True):
return await self.cell.getUserDef(iden, packroles=packroles)
[docs]
@adminapi()
async def getAuthGate(self, iden):
return await self.cell.getAuthGate(iden)
[docs]
@adminapi()
async def getAuthGates(self):
return await self.cell.getAuthGates()
[docs]
@adminapi()
async def getRoleDef(self, iden):
return await self.cell.getRoleDef(iden)
[docs]
@adminapi()
async def getUserDefByName(self, name):
return await self.cell.getUserDefByName(name)
[docs]
@adminapi()
async def getRoleDefByName(self, name):
return await self.cell.getRoleDefByName(name)
[docs]
@adminapi()
async def getUserDefs(self):
return await self.cell.getUserDefs()
[docs]
@adminapi()
async def getRoleDefs(self):
return await self.cell.getRoleDefs()
[docs]
@adminapi()
async def isUserAllowed(self, iden, perm, gateiden=None, default=False):
return await self.cell.isUserAllowed(iden, perm, gateiden=gateiden, default=default)
[docs]
@adminapi()
async def isRoleAllowed(self, iden, perm, gateiden=None):
return await self.cell.isRoleAllowed(iden, perm, gateiden=gateiden)
[docs]
@adminapi()
async def tryUserPasswd(self, name, passwd):
return await self.cell.tryUserPasswd(name, passwd)
[docs]
@adminapi()
async def getUserProfile(self, iden):
return await self.cell.getUserProfile(iden)
[docs]
@adminapi()
async def getUserProfInfo(self, iden, name):
return await self.cell.getUserProfInfo(iden, name)
[docs]
@adminapi()
async def setUserProfInfo(self, iden, name, valu):
return await self.cell.setUserProfInfo(iden, name, valu)
[docs]
@adminapi()
async def popUserProfInfo(self, iden, name, default=None):
return await self.cell.popUserProfInfo(iden, name, default=default)
[docs]
@adminapi()
async def checkUserApiKey(self, key):
return await self.cell.checkUserApiKey(key)
[docs]
async def getHealthCheck(self):
await self._reqUserAllowed(('health',))
return await self.cell.getHealthCheck()
[docs]
@adminapi()
async def getDmonSessions(self):
return await self.cell.getDmonSessions()
[docs]
@adminapi()
async def listHiveKey(self, path=None):
s_common.deprecated('CellApi.listHiveKey', curv='2.167.0')
return await self.cell.listHiveKey(path=path)
[docs]
@adminapi(log=True)
async def getHiveKeys(self, path):
s_common.deprecated('CellApi.getHiveKeys', curv='2.167.0')
return await self.cell.getHiveKeys(path)
[docs]
@adminapi(log=True)
async def getHiveKey(self, path):
s_common.deprecated('CellApi.getHiveKey', curv='2.167.0')
return await self.cell.getHiveKey(path)
[docs]
@adminapi(log=True)
async def setHiveKey(self, path, valu):
s_common.deprecated('CellApi.setHiveKey', curv='2.167.0')
return await self.cell.setHiveKey(path, valu)
[docs]
@adminapi(log=True)
async def popHiveKey(self, path):
s_common.deprecated('CellApi.popHiveKey', curv='2.167.0')
return await self.cell.popHiveKey(path)
[docs]
@adminapi(log=True)
async def saveHiveTree(self, path=()):
s_common.deprecated('CellApi.saveHiveTree', curv='2.167.0')
return await self.cell.saveHiveTree(path=path)
[docs]
@adminapi()
async def getNexusChanges(self, offs, tellready=False):
async for item in self.cell.getNexusChanges(offs, tellready=tellready):
yield item
[docs]
@adminapi()
async def runBackup(self, name=None, wait=True):
'''
Run a new backup.
Args:
name (str): The optional name of the backup.
wait (bool): On True, wait for backup to complete before returning.
Returns:
str: The name of the newly created backup.
'''
return await self.cell.runBackup(name=name, wait=wait)
[docs]
@adminapi()
async def getBackupInfo(self):
'''
Get information about recent backup activity.
Returns:
(dict) It has the following keys:
- currduration - If backup currently running, time in ms since backup started, otherwise None
- laststart - Last time (in epoch milliseconds) a backup started
- lastend - Last time (in epoch milliseconds) a backup ended
- lastduration - How long last backup took in ms
- lastsize - Disk usage of last backup completed
- lastupload - Time a backup was last completed being uploaded via iter(New)BackupArchive
- lastexception - Tuple of exception information if last backup failed, otherwise None
Note: these statistics are not persistent, i.e. they are not preserved between cell restarts.
'''
return await self.cell.getBackupInfo()
[docs]
@adminapi()
async def getBackups(self):
'''
Retrieve a list of backups.
Returns:
list[str]: A list of backup names.
'''
return await self.cell.getBackups()
[docs]
@adminapi()
async def delBackup(self, name):
'''
Delete a backup by name.
Args:
name (str): The name of the backup to delete.
'''
return await self.cell.delBackup(name)
[docs]
@adminapi()
async def iterBackupArchive(self, name):
'''
Retrieve a backup by name as a compressed stream of bytes.
Note: Compression and streaming will occur from a separate process.
Args:
name (str): The name of the backup to retrieve.
'''
await self.cell.iterBackupArchive(name, user=self.user)
# Make this a generator
if False: # pragma: no cover
yield
[docs]
@adminapi()
async def iterNewBackupArchive(self, name=None, remove=False):
'''
Run a new backup and return it as a compressed stream of bytes.
Note: Compression and streaming will occur from a separate process.
Args:
name (str): The name of the backup to retrieve.
remove (bool): Delete the backup after streaming.
'''
await self.cell.iterNewBackupArchive(user=self.user, name=name, remove=remove)
# Make this a generator
if False: # pragma: no cover
yield
[docs]
@adminapi()
async def getDiagInfo(self):
return {
'slabs': await s_lmdbslab.Slab.getSlabStats(),
}
[docs]
@adminapi()
async def runGcCollect(self, generation=2):
'''
For diagnostic purposes only!
NOTE: This API is *not* supported and can be removed at any time!
'''
return gc.collect(generation=generation)
[docs]
@adminapi()
async def getGcInfo(self):
'''
For diagnostic purposes only!
NOTE: This API is *not* supported and can be removed at any time!
'''
return {
'stats': gc.get_stats(),
'threshold': gc.get_threshold(),
}
[docs]
@adminapi()
async def getReloadableSystems(self):
return self.cell.getReloadableSystems()
[docs]
@adminapi(log=True)
async def reload(self, subsystem=None):
return await self.cell.reload(subsystem=subsystem)
[docs]
class Cell(s_nexus.Pusher, s_telepath.Aware):
'''
A Cell() implements a synapse micro-service.
A Cell has 5 phases of startup:
1. Universal cell data structures
2. Service specific storage/data (pre-nexs)
3. Nexus subsystem initialization
4. Service specific startup (with nexus)
5. Networking and mirror services
'''
cellapi = CellApi
confdefs = {} # type: ignore # This should be a JSONSchema properties list for an object.
confbase = {
'cell:guid': {
'description': 'An optional hard-coded GUID to store as the permanent GUID for the service.',
'type': 'string',
'hideconf': True,
},
'cell:ctor': {
'description': 'An optional python path to the Cell class. Used by stemcell.',
'type': 'string',
'hideconf': True,
},
'mirror': {
'description': 'A telepath URL for our upstream mirror (we must be a backup!).',
'type': ['string', 'null'],
'hidedocs': True,
'hidecmdl': True,
},
'auth:passwd': {
'description': 'Set to <passwd> (local only) to bootstrap the root user password.',
'type': 'string',
},
'auth:anon': {
'description': 'Allow anonymous telepath access by mapping to the given user name.',
'type': 'string',
},
'auth:ctor': {
'description': 'Allow the construction of the cell auth object to be hooked at runtime.',
'type': 'string',
'hideconf': True,
},
'auth:conf': {
'description': 'Extended configuration to be used by an alternate auth constructor.',
'type': 'object',
'hideconf': True,
},
'auth:passwd:policy': {
'description': 'Specify password policy/complexity requirements.',
'type': 'object',
},
'max:users': {
'default': 0,
'description': 'Maximum number of users allowed on system, not including root or locked/archived users (0 is no limit).',
'type': 'integer',
'minimum': 0
},
'nexslog:en': {
'default': False,
'description': 'Record all changes to a stream file on disk. Required for mirroring (on both sides).',
'type': 'boolean',
},
'nexslog:async': {
'default': True,
'description': 'Deprecated. This option ignored.',
'type': 'boolean',
'hidedocs': True,
'hidecmdl': True,
},
'dmon:listen': {
'description': 'A config-driven way to specify the telepath bind URL.',
'type': ['string', 'null'],
},
'https:port': {
'description': 'A config-driven way to specify the HTTPS port.',
'type': ['integer', 'null'],
},
'https:headers': {
'description': 'Headers to add to all HTTPS server responses.',
'type': 'object',
'hidecmdl': True,
},
'https:parse:proxy:remoteip': {
'description': 'Enable the HTTPS server to parse X-Forwarded-For and X-Real-IP headers to determine requester IP addresses.',
'type': 'boolean',
'default': False,
},
'backup:dir': {
'description': 'A directory outside the service directory where backups will be saved. Defaults to ./backups in the service storage directory.',
'type': 'string',
},
'onboot:optimize': {
'default': False,
'description': 'Delay startup to optimize LMDB databases during boot to recover free space and increase performance. This may take a while.',
'type': 'boolean',
},
'limit:disk:free': {
'default': 5,
'description': 'Minimum disk free space percentage before setting the cell read-only.',
'type': ['integer', 'null'],
'minimum': 0,
'maximum': 100,
},
'health:sysctl:checks': {
'default': True,
'description': 'Enable sysctl parameter checks and warn if values are not optimal.',
'type': 'boolean',
},
'aha:name': {
'description': 'The name of the cell service in the aha service registry.',
'type': 'string',
},
'aha:user': {
'description': 'The username of this service when connecting to others.',
'type': 'string',
},
'aha:leader': {
'description': 'The AHA service name to claim as the active instance of a storm service.',
'type': 'string',
},
'aha:network': {
'description': 'The AHA service network.',
'type': 'string',
},
'aha:registry': {
'description': 'The telepath URL of the aha service registry.',
'type': ['string', 'array'],
'items': {'type': 'string'},
},
'aha:provision': {
'description': 'The telepath URL of the aha provisioning service.',
'type': ['string', 'array'],
'items': {'type': 'string'},
},
'aha:admin': {
'description': 'An AHA client certificate CN to register as a local admin user.',
'type': 'string',
},
'aha:svcinfo': {
'description': 'An AHA svcinfo object. If set, this overrides self discovered Aha service information.',
'type': 'object',
'properties': {
'urlinfo': {
'type': 'object',
'properties': {
'host': {'type': 'string'},
'port': {'type': 'integer'},
'schema': {'type': 'string'}
},
'required': ('host', 'port', 'scheme', )
}
},
'required': ('urlinfo', ),
'hidedocs': True,
'hidecmdl': True,
},
'inaugural': {
'description': 'Data used to drive configuration of the service upon first startup.',
'type': 'object',
'properties': {
'roles': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'name': {'type': 'string',
'pattern': '^(?!all$).+$',
},
'rules': {
'type': 'array',
'items': {
'type': 'array',
'items': [
{'type': 'boolean'},
{'type': 'array', 'items': {'type': 'string'}, },
],
'minItems': 2,
'maxItems': 2,
},
}
},
'required': ['name', ],
'additionalProperties': False,
}
},
'users': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'name': {'type': 'string',
'pattern': '^(?!root$).+$',
},
'admin': {'type': 'boolean', 'default': False, },
'email': {'type': 'string', },
'roles': {
'type': 'array',
'items': {'type': 'string'},
},
'rules': {
'type': 'array',
'items': {
'type': 'array',
'items': [
{'type': 'boolean'},
{'type': 'array', 'items': {'type': 'string'}, },
],
'minItems': 2,
'maxItems': 2,
},
},
},
'required': ['name', ],
'additionalProperties': False,
}
}
},
'hidedocs': True,
},
'_log_conf': {
'description': 'Opaque structure used for logging by spawned processes.',
'type': 'object',
'hideconf': True
}
}
BACKUP_SPAWN_TIMEOUT = 60.0
FREE_SPACE_CHECK_FREQ = 60.0
COMMIT = s_version.commit
VERSION = s_version.version
VERSTRING = s_version.verstring
SYSCTL_VALS = {
'vm.dirty_expire_centisecs': 20,
'vm.dirty_writeback_centisecs': 20,
}
SYSCTL_CHECK_FREQ = 60.0
async def __anit__(self, dirn, conf=None, readonly=False, parent=None):
# phase 1
if conf is None:
conf = {}
self.starttime = time.monotonic() # Used for uptime calc
self.startms = s_common.now() # Used to report start time
s_telepath.Aware.__init__(self)
self.dirn = s_common.gendir(dirn)
self.runid = s_common.guid()
self.auth = None
self.cellparent = parent
self.sessions = {}
self.paused = False
self.isactive = False
self.activebase = None
self.inaugural = False
self.activecoros = {}
self.sockaddr = None # Default value...
self.https_listeners = []
self.ahaclient = None
self._checkspace = s_coro.Event()
self._reloadfuncs = {} # name -> func
self.nexslock = asyncio.Lock()
self.netready = asyncio.Event()
self.conf = self._initCellConf(conf)
self.minfree = self.conf.get('limit:disk:free')
if self.minfree is not None:
self.minfree = self.minfree / 100
disk = shutil.disk_usage(self.dirn)
if (disk.free / disk.total) <= self.minfree:
free = disk.free / disk.total * 100
mesg = f'Free space on {self.dirn} below minimum threshold (currently {free:.2f}%)'
raise s_exc.LowSpace(mesg=mesg, dirn=self.dirn)
self._delTmpFiles()
if self.conf.get('onboot:optimize'):
await self._onBootOptimize()
await self._initCellBoot()
# we need to know this pretty early...
self.ahasvcname = None
ahaname = self.conf.get('aha:name')
ahanetw = self.conf.get('aha:network')
if ahaname is not None and ahanetw is not None:
self.ahasvcname = f'{ahaname}.{ahanetw}'
# each cell has a guid
path = s_common.genpath(self.dirn, 'cell.guid')
# generate a guid file if needed
if not os.path.isfile(path):
self.inaugural = True
guid = conf.get('cell:guid')
if guid is None:
guid = s_common.guid()
with open(path, 'w') as fd:
fd.write(guid)
# read & lock our guid file
self._cellguidfd = s_common.genfile(path)
self.iden = self._cellguidfd.read().decode().strip()
self._getCellLock()
backdirn = self.conf.get('backup:dir')
if backdirn is not None:
backdirn = s_common.genpath(backdirn)
if backdirn.startswith(self.dirn):
mesg = 'backup:dir must not be within the service directory'
raise s_exc.BadConfValu(mesg=mesg)
else:
backdirn = s_common.genpath(self.dirn, 'backups')
backdirn = s_common.gendir(backdirn)
self.backdirn = backdirn
self.backuprunning = False # Whether a backup is currently running
self.backupstreaming = False # Whether a temporary new backup is currently streaming
self.backmonostart = None # If a backup is running, when did it start (monotonic timer)
self.backstartdt = None # Last backup start time
self.backenddt = None # Last backup end time
self.backlasttook = None # Last backup duration
self.backlastsize = None # Last backup size
self.backlastuploaddt = None # Last time backup completed uploading via iter{New,}BackupArchive
self.backlastexc = None # err, errmsg, errtrace of last backup
if self.conf.get('mirror') and not self.conf.get('nexslog:en'):
self.modCellConf({'nexslog:en': True})
await s_nexus.Pusher.__anit__(self, self.iden)
self._initCertDir()
await self.enter_context(s_telepath.loadTeleCell(self.dirn))
await self._initCellSlab(readonly=readonly)
# initialize network daemons (but do not listen yet)
# to allow registration of callbacks and shared objects
await self._initCellHttp()
await self._initCellDmon()
await self.initServiceEarly()
nexsroot = await self._ctorNexsRoot()
self.setNexsRoot(nexsroot)
async def fini():
await self.nexsroot.fini()
self.onfini(fini)
self.apikeydb = self.slab.initdb('user:apikeys') # apikey -> useriden
self.usermetadb = self.slab.initdb('user:meta') # useriden + <valu> -> dict valu
self.rolemetadb = self.slab.initdb('role:meta') # roleiden + <valu> -> dict valu
# for runtime cell configuration values
self.slab.initdb('cell:conf')
self._sslctx_cache = s_cache.FixedCache(self._makeCachedSslCtx, size=SSLCTX_CACHE_SIZE)
self.hive = await self._initCellHive()
self.cellinfo = self.slab.getSafeKeyVal('cell:info')
self.cellvers = self.slab.getSafeKeyVal('cell:vers')
await self._bumpCellVers('cell:storage', (
(1, self._storCellHiveMigration),
), nexs=False)
if self.inaugural:
self.cellinfo.set('nexus:version', NEXUS_VERSION)
# Check the cell version didn't regress
if (lastver := self.cellinfo.get('cell:version')) is not None and self.VERSION < lastver:
mesg = f'Cell version regression ({self.getCellType()}) is not allowed! Stored version: {lastver}, current version: {self.VERSION}.'
logger.error(mesg)
raise s_exc.BadVersion(mesg=mesg, currver=self.VERSION, lastver=lastver)
self.cellinfo.set('cell:version', self.VERSION)
# Check the synapse version didn't regress
if (lastver := self.cellinfo.get('synapse:version')) is not None and s_version.version < lastver:
mesg = f'Synapse version regression ({self.getCellType()}) is not allowed! Stored version: {lastver}, current version: {s_version.version}.'
logger.error(mesg)
raise s_exc.BadVersion(mesg=mesg, currver=s_version.version, lastver=lastver)
self.cellinfo.set('synapse:version', s_version.version)
self.nexsvers = self.cellinfo.get('nexus:version', (0, 0))
self.nexspatches = ()
await self._bumpCellVers('cell:storage', (
(2, self._storCellAuthMigration),
), nexs=False)
self.auth = await self._initCellAuth()
auth_passwd = self.conf.get('auth:passwd')
if auth_passwd is not None:
user = await self.auth.getUserByName('root')
if not await user.tryPasswd(auth_passwd, nexs=False, enforce_policy=False):
await user.setPasswd(auth_passwd, nexs=False, enforce_policy=False)
self.boss = await s_boss.Boss.anit()
self.onfini(self.boss)
self.dynitems = {
'auth': self.auth,
'cell': self
}
self.permdefs = None
self.permlook = None
# initialize web app and callback data structures
self._health_funcs = []
self.addHealthFunc(self._cellHealth)
if self.conf.get('health:sysctl:checks'):
self.schedCoro(self._runSysctlLoop())
# initialize network backend infrastructure
await self._initAhaRegistry()
# phase 2 - service storage
await self.initCellStorage()
await self.initServiceStorage()
# phase 3 - nexus subsystem
await self.initNexusSubsystem()
await self.configNexsVers()
# We can now do nexus-safe operations
await self._initInauguralConfig()
# phase 4 - service logic
await self.initServiceRuntime()
# phase 5 - service networking
await self.initServiceNetwork()
async def _storCellHiveMigration(self):
logger.warning(f'migrating Cell ({self.getCellType()}) info out of hive')
async with await self.hive.open(('cellvers',)) as versnode:
versdict = await versnode.dict()
for key, valu in versdict.items():
self.cellvers.set(key, valu)
async with await self.hive.open(('cellinfo',)) as infonode:
infodict = await infonode.dict()
for key, valu in infodict.items():
self.cellinfo.set(key, valu)
logger.warning(f'...Cell ({self.getCellType()}) info migration complete!')
async def _storCellAuthMigration(self):
if self.conf.get('auth:ctor') is not None:
return
logger.warning(f'migrating Cell ({self.getCellType()}) auth out of hive')
authkv = self.slab.getSafeKeyVal('auth')
async with await self.hive.open(('auth',)) as rootnode:
rolekv = authkv.getSubKeyVal('role:info:')
rolenamekv = authkv.getSubKeyVal('role:name:')
async with await rootnode.open(('roles',)) as roles:
for iden, node in roles:
roledict = await node.dict()
roleinfo = roledict.pack()
roleinfo['iden'] = iden
roleinfo['name'] = node.valu
roleinfo['authgates'] = {}
roleinfo.setdefault('admin', False)
roleinfo.setdefault('rules', ())
rolekv.set(iden, roleinfo)
rolenamekv.set(node.valu, iden)
userkv = authkv.getSubKeyVal('user:info:')
usernamekv = authkv.getSubKeyVal('user:name:')
async with await rootnode.open(('users',)) as users:
for iden, node in users:
userdict = await node.dict()
userinfo = userdict.pack()
userinfo['iden'] = iden
userinfo['name'] = node.valu
userinfo['authgates'] = {}
userinfo.setdefault('admin', False)
userinfo.setdefault('rules', ())
userinfo.setdefault('locked', False)
userinfo.setdefault('passwd', None)
userinfo.setdefault('archived', False)
realroles = []
for userrole in userinfo.get('roles', ()):
if rolekv.get(userrole) is None:
mesg = f'Unknown role {userrole} on user {iden} during migration, ignoring.'
logger.warning(mesg)
continue
realroles.append(userrole)
userinfo['roles'] = tuple(realroles)
userkv.set(iden, userinfo)
usernamekv.set(node.valu, iden)
varskv = authkv.getSubKeyVal(f'user:{iden}:vars:')
async with await node.open(('vars',)) as varnodes:
for name, varnode in varnodes:
varskv.set(name, varnode.valu)
profkv = authkv.getSubKeyVal(f'user:{iden}:profile:')
async with await node.open(('profile',)) as profnodes:
for name, profnode in profnodes:
profkv.set(name, profnode.valu)
gatekv = authkv.getSubKeyVal('gate:info:')
async with await rootnode.open(('authgates',)) as authgates:
for gateiden, node in authgates:
gateinfo = {
'iden': gateiden,
'type': node.valu
}
gatekv.set(gateiden, gateinfo)
async with await node.open(('users',)) as usernodes:
for useriden, usernode in usernodes:
if (user := userkv.get(useriden)) is None:
mesg = f'Unknown user {useriden} on gate {gateiden} during migration, ignoring.'
logger.warning(mesg)
continue
userinfo = await usernode.dict()
userdict = userinfo.pack()
authkv.set(f'gate:{gateiden}:user:{useriden}', userdict)
user['authgates'][gateiden] = userdict
userkv.set(useriden, user)
async with await node.open(('roles',)) as rolenodes:
for roleiden, rolenode in rolenodes:
if (role := rolekv.get(roleiden)) is None:
mesg = f'Unknown role {roleiden} on gate {gateiden} during migration, ignoring.'
logger.warning(mesg)
continue
roleinfo = await rolenode.dict()
roledict = roleinfo.pack()
authkv.set(f'gate:{gateiden}:role:{roleiden}', roledict)
role['authgates'][gateiden] = roledict
rolekv.set(roleiden, role)
logger.warning(f'...Cell ({self.getCellType()}) auth migration complete!')
[docs]
def getPermDef(self, perm):
perm = tuple(perm)
if self.permlook is None:
self.permlook = {pdef['perm']: pdef for pdef in self.getPermDefs()}
return self.permlook.get(perm)
[docs]
def getPermDefs(self):
if self.permdefs is None:
self.permdefs = self._getPermDefs()
return self.permdefs
def _getPermDefs(self):
return ()
def _clearPermDefs(self):
self.permdefs = None
self.permlook = None
[docs]
def getDmonUser(self):
'''
Return the user IDEN of a telepath caller who invoked the current task.
( defaults to returning current root user )
'''
sess = s_scope.get('sess', None)
if sess is None:
return self.auth.rootuser.iden
if sess.user is None: # pragma: no cover
return self.auth.rootuser.iden
return sess.user.iden
[docs]
async def fini(self):
'''Fini override that ensures locking teardown order.'''
# we inherit from Pusher to make the Cell a Base subclass
retn = await s_nexus.Pusher.fini(self)
if retn == 0:
self._onFiniCellGuid()
return retn
def _onFiniCellGuid(self):
fcntl.lockf(self._cellguidfd, fcntl.LOCK_UN)
self._cellguidfd.close()
def _getCellLock(self):
cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
try:
fcntl.lockf(self._cellguidfd, cmd)
except BlockingIOError:
# Raised when the a lock is unable to be obtained on cell.guid file.
ctyp = self.getCellType()
mesg = f'Cannot start the {ctyp}, another process is already running it.'
raise s_exc.FatalErr(mesg=mesg) from None
async def _onBootOptimize(self):
bdir = s_common.genpath(self.dirn, 'backups')
tdir = s_common.gendir(self.dirn, 'tmp')
tdev = os.stat(tdir).st_dev
logger.warning('Collecting LMDB files for onboot optimization.')
lmdbs = []
for (root, dirs, files) in os.walk(self.dirn):
for dirname in dirs:
filepath = os.path.join(root, dirname, 'data.mdb')
if os.path.isfile(filepath):
if filepath.startswith(bdir):
logger.debug(f'Skipping backup file {filepath}')
continue
if os.stat(filepath).st_dev != tdev:
logger.warning(f'Unable to run onboot:optimize, {filepath} is not on the same volume as {tdir}')
return
lmdbs.append(os.path.join(root, dirname))
if not lmdbs: # pragma: no cover
return
logger.warning('Beginning onboot optimization (this could take a while)...')
size = len(lmdbs)
try:
for i, lmdbpath in enumerate(lmdbs):
logger.warning(f'... {i+1}/{size} {lmdbpath}')
with self.getTempDir() as backpath:
async with await s_lmdbslab.LmdbBackup.anit(lmdbpath) as backup:
await backup.saveto(backpath)
srcpath = os.path.join(lmdbpath, 'data.mdb')
dstpath = os.path.join(backpath, 'data.mdb')
os.rename(dstpath, srcpath)
logger.warning('... onboot optimization complete!')
except Exception as e: # pragma: no cover
logger.exception('...aborting onboot optimization and resuming boot (everything is fine).')
def _delTmpFiles(self):
tdir = s_common.gendir(self.dirn, 'tmp')
names = os.listdir(tdir)
if not names:
return
logger.warning(f'Removing {len(names)} temporary files/folders in: {tdir}')
for name in names:
path = os.path.join(tdir, name)
if os.path.isfile(path):
os.unlink(path)
continue
if os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
continue
async def _execCellUpdates(self):
# implement to apply updates to a fully initialized active cell
# ( and do so using _bumpCellVers )
pass
[docs]
async def setNexsVers(self, vers):
if self.nexsvers < NEXUS_VERSION:
await self._push('nexs:vers:set', NEXUS_VERSION)
@s_nexus.Pusher.onPush('nexs:vers:set')
async def _setNexsVers(self, vers):
if vers > self.nexsvers:
self.cellvers.set('nexus:version', vers)
self.nexsvers = vers
await self.configNexsVers()
[docs]
async def configNexsVers(self):
for meth, orig in self.nexspatches:
setattr(self, meth, orig)
if self.nexsvers == NEXUS_VERSION:
return
patches = []
if self.nexsvers < (2, 177):
patches.extend([
('popUserVarValu', self._popUserVarValuV0),
('setUserVarValu', self._setUserVarValuV0),
('popUserProfInfo', self._popUserProfInfoV0),
('setUserProfInfo', self._setUserProfInfoV0),
])
self.nexspatches = []
for meth, repl in patches:
self.nexspatches.append((meth, getattr(self, meth)))
setattr(self, meth, repl)
[docs]
async def setCellVers(self, name, vers, nexs=True):
if nexs:
await self._push('cell:vers:set', name, vers)
else:
await self._setCellVers(name, vers)
@s_nexus.Pusher.onPush('cell:vers:set')
async def _setCellVers(self, name, vers):
self.cellvers.set(name, vers)
async def _bumpCellVers(self, name, updates, nexs=True):
if self.inaugural:
await self.setCellVers(name, updates[-1][0], nexs=nexs)
return
curv = self.cellvers.get(name, 0)
for vers, callback in updates:
if vers <= curv:
continue
await callback()
await self.setCellVers(name, vers, nexs=nexs)
curv = vers
[docs]
def checkFreeSpace(self):
self._checkspace.set()
async def _runFreeSpaceLoop(self):
while not self.isfini:
nexsroot = self.getCellNexsRoot()
self._checkspace.clear()
disk = shutil.disk_usage(self.dirn)
if (disk.free / disk.total) <= self.minfree:
await nexsroot.addWriteHold(diskspace)
mesg = f'Free space on {self.dirn} below minimum threshold (currently ' \
f'{disk.free / disk.total * 100:.2f}%), setting Cell to read-only.'
logger.error(mesg)
elif nexsroot.readonly and await nexsroot.delWriteHold(diskspace):
mesg = f'Free space on {self.dirn} above minimum threshold (currently ' \
f'{disk.free / disk.total * 100:.2f}%), removing free space write hold.'
logger.error(mesg)
await self._checkspace.timewait(timeout=self.FREE_SPACE_CHECK_FREQ)
async def _runSysctlLoop(self):
while not self.isfini:
fixvals = []
sysctls = s_thisplat.getSysctls()
for name, valu in self.SYSCTL_VALS.items():
if (sysval := sysctls.get(name)) != valu:
fixvals.append({'name': name, 'expected': valu, 'actual': sysval})
if not fixvals:
# All sysctl parameters have been set to recommended values, no
# need to keep checking.
break
fixnames = [k['name'] for k in fixvals]
mesg = f'Sysctl values different than expected: {", ".join(fixnames)}. '
mesg += 'See https://synapse.docs.vertex.link/en/latest/synapse/devopsguide.html#performance-tuning '
mesg += 'for information about these sysctl parameters.'
extra = await self.getLogExtra(sysctls=fixvals)
logger.warning(mesg, extra=extra)
await self.waitfini(self.SYSCTL_CHECK_FREQ)
async def _initAhaRegistry(self):
ahaurls = self.conf.get('aha:registry')
if ahaurls is not None:
await s_telepath.addAhaUrl(ahaurls)
if self.ahaclient is not None:
await self.ahaclient.fini()
async def onlink(proxy):
ahauser = self.conf.get('aha:user', 'root')
newurls = await proxy.getAhaUrls(user=ahauser)
oldurls = self.conf.get('aha:registry')
if isinstance(oldurls, str):
oldurls = (oldurls,)
elif isinstance(oldurls, list):
oldurls = tuple(oldurls)
if newurls and newurls != oldurls:
if oldurls[0].startswith('tcp://'):
s_common.deprecated('aha:registry: tcp:// client values.')
logger.warning('tcp:// based aha:registry options are deprecated and will be removed in Synapse v3.0.0')
return
self.modCellConf({'aha:registry': newurls})
self.ahaclient.setBootUrls(newurls)
self.ahaclient = await s_telepath.Client.anit(ahaurls, onlink=onlink)
self.onfini(self.ahaclient)
async def fini():
await s_telepath.delAhaUrl(ahaurls)
self.ahaclient.onfini(fini)
ahaadmin = self.conf.get('aha:admin')
ahauser = self.conf.get('aha:user')
if ahaadmin is not None:
await self._addAdminUser(ahaadmin)
if ahauser is not None:
await self._addAdminUser(ahauser)
def _getDmonListen(self):
lisn = self.conf.get('dmon:listen', s_common.novalu)
if lisn is not s_common.novalu:
return lisn
ahaname = self.conf.get('aha:name')
ahanetw = self.conf.get('aha:network')
if ahaname is not None and ahanetw is not None:
hostname = f'{ahaname}.{ahanetw}'
return f'ssl://0.0.0.0:0?hostname={hostname}&ca={ahanetw}'
async def _addAdminUser(self, username):
# add the user in a pre-nexus compatible way
user = await self.auth.getUserByName(username)
if user is None:
iden = s_common.guid(username)
await self.auth._addUser(iden, username)
user = await self.auth.getUserByName(username)
if not user.isAdmin():
await user.setAdmin(True, logged=False)
if user.isLocked():
await user.setLocked(False, logged=False)
[docs]
async def initServiceEarly(self):
pass
[docs]
async def initCellStorage(self):
self.drive = await s_drive.Drive.anit(self.slab, 'celldrive')
self.onfini(self.drive.fini)
[docs]
async def addDriveItem(self, info, path=None, reldir=s_drive.rootdir):
iden = info.get('iden')
if iden is None:
info['iden'] = s_common.guid()
info.setdefault('created', s_common.now())
info.setdefault('creator', self.auth.rootuser.iden)
return await self._push('drive:add', info, path=path, reldir=reldir)
@s_nexus.Pusher.onPush('drive:add')
async def _addDriveItem(self, info, path=None, reldir=s_drive.rootdir):
# replay safety...
iden = info.get('iden')
if self.drive.hasItemInfo(iden): # pragma: no cover
return await self.drive.getItemPath(iden)
return await self.drive.addItemInfo(info, path=path, reldir=reldir)
[docs]
async def getDriveInfo(self, iden, typename=None):
return self.drive.getItemInfo(iden, typename=typename)
[docs]
def reqDriveInfo(self, iden, typename=None):
return self.drive.reqItemInfo(iden, typename=typename)
[docs]
async def getDrivePath(self, path, reldir=s_drive.rootdir):
'''
Return a list of drive info elements for each step in path.
This may be used as a sort of "open" which returns all the
path info entries. You may then operate directly on drive iden
entries and/or check easyperm entries on them before you do...
'''
return await self.drive.getPathInfo(path, reldir=reldir)
[docs]
async def addDrivePath(self, path, perm=None, reldir=s_drive.rootdir):
'''
Create the given path using the specified permissions.
The specified permissions are only used when creating new directories.
NOTE: We must do this outside the Drive class to allow us to generate
iden and tick but remain nexus compatible.
'''
tick = s_common.now()
user = self.auth.rootuser.iden
path = self.drive.getPathNorm(path)
if perm is None:
perm = {'users': {}, 'roles': {}}
for name in path:
info = self.drive.getStepInfo(reldir, name)
await asyncio.sleep(0)
if info is not None:
reldir = info.get('iden')
continue
info = {
'name': name,
'perm': perm,
'iden': s_common.guid(),
'created': tick,
'creator': user,
}
pathinfo = await self.addDriveItem(info, reldir=reldir)
reldir = pathinfo[-1].get('iden')
return await self.drive.getItemPath(reldir)
[docs]
async def getDriveData(self, iden, vers=None):
'''
Return the data associated with the drive item by iden.
If vers is specified, return that specific version.
'''
return self.drive.getItemData(iden, vers=vers)
[docs]
async def getDriveDataVersions(self, iden):
async for item in self.drive.getItemDataVersions(iden):
yield item
[docs]
@s_nexus.Pusher.onPushAuto('drive:del')
async def delDriveInfo(self, iden):
if self.drive.getItemInfo(iden) is not None:
await self.drive.delItemInfo(iden)
[docs]
@s_nexus.Pusher.onPushAuto('drive:set:perm')
async def setDriveInfoPerm(self, iden, perm):
return self.drive.setItemPerm(iden, perm)
[docs]
@s_nexus.Pusher.onPushAuto('drive:set:path')
async def setDriveInfoPath(self, iden, path):
path = self.drive.getPathNorm(path)
pathinfo = await self.drive.getItemPath(iden)
if path == [p.get('name') for p in pathinfo]:
return pathinfo
return await self.drive.setItemPath(iden, path)
[docs]
@s_nexus.Pusher.onPushAuto('drive:data:set')
async def setDriveData(self, iden, versinfo, data):
return await self.drive.setItemData(iden, versinfo, data)
[docs]
async def delDriveData(self, iden, vers=None):
if vers is None:
info = self.drive.reqItemInfo(iden)
vers = info.get('version')
return await self._push('drive:data:del', iden, vers)
@s_nexus.Pusher.onPush('drive:data:del')
async def _delDriveData(self, iden, vers):
return self.drive.delItemData(iden, vers)
[docs]
async def getDriveKids(self, iden):
async for info in self.drive.getItemKids(iden):
yield info
[docs]
async def initServiceStorage(self):
pass
[docs]
async def initNexusSubsystem(self):
if self.cellparent is None:
await self.nexsroot.recover()
await self.nexsroot.startup()
await self.setCellActive(self.conf.get('mirror') is None)
if self.minfree is not None:
self.schedCoro(self._runFreeSpaceLoop())
async def _bindDmonListen(self):
# functionalized so downstream code can bind early.
if self.sockaddr is not None:
return
# start a unix local socket daemon listener
sockpath = os.path.join(self.dirn, 'sock')
sockurl = f'unix://{sockpath}'
try:
await self.dmon.listen(sockurl)
except OSError as e:
logger.error(f'Failed to listen on unix socket at: [{sockpath}][{e}]')
logger.error('LOCAL UNIX SOCKET WILL BE UNAVAILABLE')
except Exception: # pragma: no cover
logging.exception('Unknown dmon listen error.')
turl = self._getDmonListen()
if turl is not None:
logger.info(f'dmon listening: {turl}')
self.sockaddr = await self.dmon.listen(turl)
[docs]
async def initServiceNetwork(self):
await self._bindDmonListen()
await self._initAhaService()
self.netready.set()
port = self.conf.get('https:port')
if port is not None:
await self.addHttpsPort(port)
logger.info(f'https listening: {port}')
[docs]
async def getAhaInfo(self):
# Default to static information
ahainfo = self.conf.get('aha:svcinfo')
if ahainfo is not None:
return ahainfo
# If we have not setup our dmon listener yet, do not generate ahainfo
if self.sockaddr is None:
return None
turl = self.conf.get('dmon:listen')
# Dynamically generate the aha info based on config and runtime data.
urlinfo = s_telepath.chopurl(turl)
urlinfo.pop('host', None)
if isinstance(self.sockaddr, tuple):
urlinfo['port'] = self.sockaddr[1]
celliden = self.getCellIden()
runiden = await self.getCellRunId()
ahalead = self.conf.get('aha:leader')
# If we are active, then we are ready by definition.
# If we are not active, then we have to check the nexsroot
# and see if the nexsroot is marked as ready or not. This
# status is set on mirrors when they have entered into the
# real-time change window.
if self.isactive:
ready = True
else:
ready = await self.nexsroot.isNexsReady()
ahainfo = {
'run': runiden,
'iden': celliden,
'leader': ahalead,
'urlinfo': urlinfo,
'ready': ready,
}
return ahainfo
async def _initAhaService(self):
if self.ahaclient is None:
return
ahaname = self.conf.get('aha:name')
if ahaname is None:
return
ahanetw = self.conf.get('aha:network')
if ahanetw is None:
return
ahainfo = await self.getAhaInfo()
if ahainfo is None:
return
ahalead = self.conf.get('aha:leader')
self.ahasvcname = f'{ahaname}.{ahanetw}'
async def _runAhaRegLoop():
while not self.isfini:
try:
proxy = await self.ahaclient.proxy()
info = await self.getAhaInfo()
await proxy.addAhaSvc(ahaname, info, network=ahanetw)
if self.isactive and ahalead is not None:
await proxy.addAhaSvc(ahalead, info, network=ahanetw)
except Exception as e:
logger.exception(f'Error registering service {self.ahasvcname} with AHA: {e}')
await self.waitfini(1)
else:
await proxy.waitfini()
self.schedCoro(_runAhaRegLoop())
[docs]
async def initServiceRuntime(self):
pass
async def _ctorNexsRoot(self):
'''
Initialize a NexsRoot to use for the cell.
'''
if self.cellparent:
return self.cellparent.nexsroot
return await s_nexus.NexsRoot.anit(self)
[docs]
async def getNexsIndx(self):
return await self.nexsroot.index()
[docs]
async def cullNexsLog(self, offs):
if self.backuprunning:
raise s_exc.SlabInUse(mesg='Cannot cull Nexus log while a backup is running')
return await self._push('nexslog:cull', offs)
@s_nexus.Pusher.onPush('nexslog:cull')
async def _cullNexsLog(self, offs):
return await self.nexsroot.cull(offs)
[docs]
async def rotateNexsLog(self):
if self.backuprunning:
raise s_exc.SlabInUse(mesg='Cannot rotate Nexus log while a backup is running')
return await self._push('nexslog:rotate')
@s_nexus.Pusher.onPush('nexslog:rotate')
async def _rotateNexsLog(self):
return await self.nexsroot.rotate()
[docs]
async def waitNexsOffs(self, offs, timeout=None):
return await self.nexsroot.waitOffs(offs, timeout=timeout)
[docs]
async def trimNexsLog(self, consumers=None, timeout=30):
if not self.conf.get('nexslog:en'):
mesg = 'trimNexsLog requires nexslog:en=True'
raise s_exc.BadConfValu(mesg=mesg)
async with await s_base.Base.anit() as base:
if consumers is not None:
cons_opened = []
for turl in consumers:
prox = await s_telepath.openurl(turl)
base.onfini(prox.fini)
cons_opened.append((s_urlhelp.sanitizeUrl(turl), prox))
offs = await self.rotateNexsLog()
cullat = offs - 1
# wait for all consumers to catch up and raise otherwise
if consumers is not None:
async def waitFor(turl_sani, prox_):
logger.debug('trimNexsLog waiting for consumer %s to write offset %d', turl_sani, cullat)
if not await prox_.waitNexsOffs(cullat, timeout=timeout):
mesg_ = 'trimNexsLog timed out waiting for consumer to write rotation offset'
raise s_exc.SynErr(mesg=mesg_, offs=cullat, timeout=timeout, url=turl_sani)
logger.info('trimNexsLog consumer %s successfully wrote offset', turl_sani)
await asyncio.gather(*[waitFor(*cons) for cons in cons_opened])
if not await self.cullNexsLog(cullat):
mesg = 'trimNexsLog did not execute cull at the rotated offset'
raise s_exc.SynErr(mesg=mesg, offs=cullat)
return cullat
[docs]
@s_nexus.Pusher.onPushAuto('nexslog:setindex')
async def setNexsIndx(self, indx):
return await self.nexsroot.setindex(indx)
[docs]
def getMyUrl(self, user='root'):
host = self.conf.req('aha:name')
network = self.conf.req('aha:network')
return f'aha://{host}.{network}'
[docs]
async def handoff(self, turl, timeout=30):
'''
Hand off leadership to a mirror in a transactional fashion.
'''
_dispname = f' ahaname={self.conf.get("aha:name")}' if self.conf.get('aha:name') else ''
if not self.isactive:
mesg = f'HANDOFF: {_dispname} is not the current leader and cannot handoff leadership to' \
f' {s_urlhelp.sanitizeUrl(turl)}.'
logger.error(mesg)
raise s_exc.BadState(mesg=mesg, turl=turl, cursvc=_dispname)
logger.warning(f'HANDOFF: Performing leadership handoff to {s_urlhelp.sanitizeUrl(turl)}{_dispname}.')
async with await s_telepath.openurl(turl) as cell:
logger.debug(f'HANDOFF: Connected to {s_urlhelp.sanitizeUrl(turl)}{_dispname}.')
if self.iden != await cell.getCellIden(): # pragma: no cover
mesg = 'Mirror handoff remote cell iden does not match!'
raise s_exc.BadArg(mesg=mesg)
if self.runid == await cell.getCellRunId(): # pragma: no cover
mesg = 'Cannot handoff mirror leadership to myself!'
raise s_exc.BadArg(mesg=mesg)
logger.debug(f'HANDOFF: Obtaining nexus lock{_dispname}.')
async with self.nexslock:
logger.debug(f'HANDOFF: Obtained nexus lock{_dispname}.')
indx = await self.getNexsIndx()
logger.debug(f'HANDOFF: Waiting {timeout} seconds for mirror to reach {indx=}{_dispname}.')
if not await cell.waitNexsOffs(indx - 1, timeout=timeout): # pragma: no cover
mndx = await cell.getNexsIndx()
mesg = f'Remote mirror did not catch up in time: {mndx}/{indx}.'
raise s_exc.NotReady(mesg=mesg)
logger.debug(f'HANDOFF: Mirror has caught up to the current leader, performing promotion{_dispname}.')
await cell.promote()
logger.debug(f'HANDOFF: Setting the service as inactive{_dispname}.')
await self.setCellActive(False)
logger.debug(f'HANDOFF: Configuring service to sync from new leader{_dispname}.')
self.modCellConf({'mirror': turl})
logger.debug(f'HANDOFF: Restarting the nexus{_dispname}.')
await self.nexsroot.startup()
logger.debug(f'HANDOFF: Released nexus lock{_dispname}.')
logger.warning(f'HANDOFF: Done performing the leadership handoff with {s_urlhelp.sanitizeUrl(turl)}{_dispname}.')
[docs]
async def reqAhaProxy(self, timeout=None):
if self.ahaclient is None:
mesg = 'AHA is not configured on this service.'
raise s_exc.NeedConfValu(mesg=mesg)
return await self.ahaclient.proxy(timeout=timeout)
async def _setAhaActive(self):
if self.ahaclient is None:
return
ahainfo = await self.getAhaInfo()
if ahainfo is None:
return
ahalead = self.conf.get('aha:leader')
if ahalead is None:
return
try:
proxy = await self.ahaclient.proxy(timeout=2)
except TimeoutError: # pragma: no cover
return None
# if we went inactive, bump the aha proxy
if not self.isactive:
await proxy.fini()
return
ahanetw = self.conf.req('aha:network')
try:
await proxy.addAhaSvc(ahalead, ahainfo, network=ahanetw)
except asyncio.CancelledError: # pragma: no cover
raise
except Exception as e: # pragma: no cover
logger.warning(f'_setAhaActive failed: {e}')
[docs]
def isActiveCoro(self, iden):
return self.activecoros.get(iden) is not None
[docs]
def addActiveCoro(self, func, iden=None, base=None):
'''
Add a function callback to be run as a coroutine when the Cell is active.
Args:
func (coroutine function): The function run as a coroutine.
iden (str): The iden to use for the coroutine.
base (Optional[Base]): if present, this active coro will be fini'd
when the base is fini'd
Returns:
str: A GUID string that identifies the coroutine for delActiveCoro()
NOTE:
This will re-fire the coroutine if it exits and the Cell is still active.
'''
if base and base.isfini:
raise s_exc.IsFini()
if iden is None:
iden = s_common.guid()
cdef = {'func': func, 'base': base}
self.activecoros[iden] = cdef
if base:
async def fini():
await self.delActiveCoro(iden)
base.onfini(fini)
if self.isactive:
self._fireActiveCoro(iden, cdef)
return iden
[docs]
async def delActiveCoro(self, iden):
'''
Remove an Active coroutine previously added with addActiveCoro().
Args:
iden (str): The iden returned by addActiveCoro()
'''
cdef = self.activecoros.pop(iden, None)
if cdef is None:
return
await self._killActiveCoro(cdef)
def _fireActiveCoros(self):
for iden, cdef in self.activecoros.items():
self._fireActiveCoro(iden, cdef)
def _fireActiveCoro(self, iden, cdef):
func = cdef.get('func')
async def wrap():
while not self.isfini and self.isActiveCoro(iden):
try:
await func()
except Exception: # pragma no cover
logger.exception(f'activeCoro Error: {func}')
await self.waitfini(1)
cdef['task'] = self.schedCoro(wrap())
async def _killActiveCoros(self):
coros = [self._killActiveCoro(cdef) for cdef in self.activecoros.values()]
await asyncio.gather(*coros, return_exceptions=True)
async def _killActiveCoro(self, cdef):
task = cdef.pop('task', None)
if task is not None:
task.cancel()
try:
retn = await asyncio.gather(task, return_exceptions=True)
if isinstance(retn[0], Exception):
raise retn[0]
except asyncio.CancelledError:
pass
except Exception: # pragma: no cover
logger.exception(f'Error tearing down activecoro for {task}')
[docs]
async def isCellActive(self):
return self.isactive
[docs]
async def setCellActive(self, active):
if active == self.isactive:
return
self.isactive = active
if self.isactive:
self.activebase = await s_base.Base.anit()
self.onfini(self.activebase)
self._fireActiveCoros()
await self._execCellUpdates()
await self.setNexsVers(NEXUS_VERSION)
await self.initServiceActive()
else:
await self._killActiveCoros()
await self.activebase.fini()
self.activebase = None
await self.initServicePassive()
await self._setAhaActive()
[docs]
def runActiveTask(self, coro):
# an API for active coroutines to use when running an
# ephemeral task which should be automatically torn down
# if the cell becomes inactive
return self.activebase.schedCoro(coro)
[docs]
async def initServiceActive(self): # pragma: no cover
pass
[docs]
async def initServicePassive(self): # pragma: no cover
pass
[docs]
async def getNexusChanges(self, offs, tellready=False):
async for item in self.nexsroot.iter(offs, tellready=tellready):
yield item
def _reqBackDirn(self, name):
self._reqBackConf()
path = s_common.genpath(self.backdirn, name)
if not path.startswith(self.backdirn):
mesg = 'Directory traversal detected'
raise s_exc.BadArg(mesg=mesg)
return path
def _reqBackupSpace(self):
disk = shutil.disk_usage(self.backdirn)
cellsize, _ = s_common.getDirSize(self.dirn)
if os.stat(self.dirn).st_dev == os.stat(self.backdirn).st_dev:
reqspace = self.minfree * disk.total + cellsize
else:
reqspace = cellsize
if reqspace > disk.free:
mesg = f'Insufficient free space on {self.backdirn} to run a backup ' \
f'({disk.free} bytes free, {reqspace} required)'
raise s_exc.LowSpace(mesg=mesg, dirn=self.backdirn)
[docs]
async def runBackup(self, name=None, wait=True):
if self.backuprunning:
raise s_exc.BackupAlreadyRunning(mesg='Another backup is already running')
try:
task = None
backupstartdt = datetime.datetime.now()
if name is None:
name = time.strftime('%Y%m%d%H%M%S', backupstartdt.timetuple())
path = self._reqBackDirn(name)
if os.path.isdir(path):
mesg = 'Backup with name already exists'
raise s_exc.BadArg(mesg=mesg)
self._reqBackupSpace()
self.backuprunning = True
self.backlastexc = None
self.backmonostart = time.monotonic()
self.backstartdt = backupstartdt
task = self.schedCoro(self._execBackupTask(path))
def done(self, task):
try:
self.backlastsize, _ = s_common.getDirSize(path)
except s_exc.NoSuchDir:
pass
self.backlasttook = time.monotonic() - self.backmonostart
self.backenddt = datetime.datetime.now()
self.backmonostart = None
try:
exc = task.exception()
except asyncio.CancelledError as e:
exc = e
if exc:
self.backlastexc = s_common.excinfo(exc)
self.backlastsize = None
else:
self.backlastexc = None
try:
self.backlastsize, _ = s_common.getDirSize(path)
except Exception:
self.backlastsize = None
self.backuprunning = False
phrase = f'with exception {self.backlastexc["errmsg"]}' if self.backlastexc else 'successfully'
logger.info(f'Backup {name} completed {phrase}. Took {self.backlasttook / 1000} s')
task.add_done_callback(functools.partial(done, self))
if wait:
logger.info(f'Waiting for backup task to complete [{name}]')
await task
return name
except (asyncio.CancelledError, Exception):
if task is not None:
task.cancel()
raise
[docs]
async def getBackupInfo(self):
'''
Gets information about recent backup activity
'''
running = int(time.monotonic() - self.backmonostart * 1000) if self.backmonostart else None
retn = {
'currduration': running,
'laststart': int(self.backstartdt.timestamp() * 1000) if self.backstartdt else None,
'lastend': int(self.backenddt.timestamp() * 1000) if self.backenddt else None,
'lastduration': int(self.backlasttook * 1000) if self.backlasttook else None,
'lastsize': self.backlastsize,
'lastupload': int(self.backlastuploaddt.timestamp() * 1000) if self.backlastuploaddt else None,
'lastexception': self.backlastexc,
}
return retn
async def _execBackupTask(self, dirn):
'''
A task that backs up the cell to the target directory
'''
logger.info(f'Starting backup to [{dirn}]')
await self.boss.promote('backup', self.auth.rootuser)
slabs = s_lmdbslab.Slab.getSlabsInDir(self.dirn)
assert slabs
ctx = multiprocessing.get_context('spawn')
mypipe, child_pipe = ctx.Pipe()
paths = [str(slab.path) for slab in slabs]
logconf = await self._getSpawnLogConf()
proc = None
try:
async with self.nexslock:
logger.debug('Syncing LMDB Slabs')
while True:
await s_lmdbslab.Slab.syncLoopOnce()
if not any(slab.dirty for slab in slabs):
break
logger.debug('Starting backup process')
# Copy cell.guid first to ensure the backup can be deleted
dstdir = s_common.gendir(dirn)
shutil.copy(os.path.join(self.dirn, 'cell.guid'), os.path.join(dstdir, 'cell.guid'))
args = (child_pipe, self.dirn, dirn, paths, logconf)
def waitforproc1():
nonlocal proc
proc = ctx.Process(target=self._backupProc, args=args)
proc.start()
hasdata = mypipe.poll(timeout=self.BACKUP_SPAWN_TIMEOUT)
if not hasdata:
raise s_exc.SynErr(mesg='backup subprocess start timed out')
data = mypipe.recv()
assert data == 'captured'
await s_coro.executor(waitforproc1)
def waitforproc2():
proc.join()
if proc.exitcode:
raise s_exc.SpawnExit(code=proc.exitcode)
await s_coro.executor(waitforproc2)
proc = None
logger.info(f'Backup completed to [{dirn}]')
return
except (asyncio.CancelledError, Exception):
logger.exception(f'Error performing backup to [{dirn}]')
raise
finally:
if proc:
proc.terminate()
@staticmethod
def _backupProc(pipe, srcdir, dstdir, lmdbpaths, logconf):
'''
(In a separate process) Actually do the backup
'''
# This is a new process: configure logging
s_common.setlogging(logger, **logconf)
try:
with s_t_backup.capturelmdbs(srcdir) as lmdbinfo:
pipe.send('captured')
logger.debug('Acquired LMDB transactions')
s_t_backup.txnbackup(lmdbinfo, srcdir, dstdir)
except Exception:
logger.exception(f'Error running backup of {srcdir}')
raise
logger.debug('Backup process completed')
def _reqBackConf(self):
if self.backdirn is None:
mesg = 'Backup APIs require the backup:dir config option is set'
raise s_exc.NeedConfValu(mesg=mesg)
[docs]
async def delBackup(self, name):
self._reqBackConf()
path = self._reqBackDirn(name)
cellguid = os.path.join(path, 'cell.guid')
if not os.path.isfile(cellguid):
mesg = 'Specified backup path has no cell.guid file.'
raise s_exc.BadArg(mesg=mesg)
logger.info(f'Removing backup for [{path}]')
await s_coro.executor(shutil.rmtree, path, ignore_errors=True)
logger.info(f'Backup removed from [{path}]')
[docs]
async def getBackups(self):
self._reqBackConf()
backups = []
def walkpath(path):
for name in os.listdir(path):
full = os.path.join(path, name)
cellguid = os.path.join(full, 'cell.guid')
if os.path.isfile(cellguid):
backups.append(os.path.relpath(full, self.backdirn))
continue
if os.path.isdir(full):
walkpath(full)
walkpath(self.backdirn)
return backups
[docs]
async def iterBackupArchive(self, name, user):
success = False
loglevel = logging.WARNING
path = self._reqBackDirn(name)
cellguid = os.path.join(path, 'cell.guid')
if not os.path.isfile(cellguid):
mesg = 'Specified backup path has no cell.guid file.'
raise s_exc.BadArg(mesg=mesg, arg='path', valu=path)
link = s_scope.get('link')
linkinfo = await link.getSpawnInfo()
linkinfo['logconf'] = await self._getSpawnLogConf()
await self.boss.promote('backup:stream', user=user, info={'name': name})
ctx = multiprocessing.get_context('spawn')
proc = None
mesg = 'Streaming complete'
def getproc():
proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo))
proc.start()
return proc
try:
proc = await s_coro.executor(getproc)
await s_coro.executor(proc.join)
except (asyncio.CancelledError, Exception) as e:
# We want to log all exceptions here, an asyncio.CancelledError
# could be the result of a remote link terminating due to the
# backup stream being completed, prior to this function
# finishing.
logger.exception('Error during backup streaming.')
if proc:
proc.terminate()
mesg = repr(e)
raise
else:
success = True
loglevel = logging.DEBUG
self.backlastuploaddt = datetime.datetime.now()
finally:
phrase = 'successfully' if success else 'with failure'
logger.log(loglevel, f'iterBackupArchive completed {phrase} for {name}')
raise s_exc.DmonSpawn(mesg=mesg)
[docs]
async def iterNewBackupArchive(self, user, name=None, remove=False):
if self.backupstreaming:
raise s_exc.BackupAlreadyRunning(mesg='Another streaming backup is already running')
try:
if remove:
self.backupstreaming = True
success = False
loglevel = logging.WARNING
if name is None:
name = time.strftime('%Y%m%d%H%M%S', datetime.datetime.now().timetuple())
path = self._reqBackDirn(name)
if os.path.isdir(path):
mesg = 'Backup with name already exists'
raise s_exc.BadArg(mesg=mesg)
link = s_scope.get('link')
linkinfo = await link.getSpawnInfo()
linkinfo['logconf'] = await self._getSpawnLogConf()
try:
await self.runBackup(name)
except Exception:
if remove:
logger.debug(f'Removing {path}')
await s_coro.executor(shutil.rmtree, path, ignore_errors=True)
logger.debug(f'Removed {path}')
raise
await self.boss.promote('backup:stream', user=user, info={'name': name})
ctx = multiprocessing.get_context('spawn')
proc = None
mesg = 'Streaming complete'
def getproc():
proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo))
proc.start()
return proc
try:
proc = await s_coro.executor(getproc)
await s_coro.executor(proc.join)
except (asyncio.CancelledError, Exception) as e:
# We want to log all exceptions here, an asyncio.CancelledError
# could be the result of a remote link terminating due to the
# backup stream being completed, prior to this function
# finishing.
logger.exception('Error during backup streaming.')
if proc:
proc.terminate()
mesg = repr(e)
raise
else:
success = True
loglevel = logging.DEBUG
self.backlastuploaddt = datetime.datetime.now()
finally:
if remove:
logger.debug(f'Removing {path}')
await s_coro.executor(shutil.rmtree, path, ignore_errors=True)
logger.debug(f'Removed {path}')
phrase = 'successfully' if success else 'with failure'
logger.log(loglevel, f'iterNewBackupArchive completed {phrase} for {name}')
raise s_exc.DmonSpawn(mesg=mesg)
finally:
if remove:
self.backupstreaming = False
[docs]
async def isUserAllowed(self, iden, perm, gateiden=None, default=False):
user = self.auth.user(iden) # type: s_auth.User
if user is None:
return False
return user.allowed(perm, default=default, gateiden=gateiden)
[docs]
async def isRoleAllowed(self, iden, perm, gateiden=None):
role = self.auth.role(iden)
if role is None:
return False
return role.allowed(perm, gateiden=gateiden)
[docs]
async def tryUserPasswd(self, name, passwd):
user = await self.auth.getUserByName(name)
if user is None:
return None
if not await user.tryPasswd(passwd):
return None
return user.pack()
[docs]
async def getUserProfile(self, iden):
user = await self.auth.reqUser(iden)
return dict(user.profile.items())
[docs]
async def getUserProfInfo(self, iden, name, default=None):
user = await self.auth.reqUser(iden)
return user.profile.get(name, defv=default)
async def _setUserProfInfoV0(self, iden, name, valu):
path = ('auth', 'users', iden, 'profile', name)
return await self.hive._push('hive:set', path, valu)
[docs]
async def setUserProfInfo(self, iden, name, valu):
user = await self.auth.reqUser(iden)
return await user.setProfileValu(name, valu)
async def _popUserProfInfoV0(self, iden, name, default=None):
path = ('auth', 'users', iden, 'profile', name)
return await self.hive._push('hive:pop', path)
[docs]
async def popUserProfInfo(self, iden, name, default=None):
user = await self.auth.reqUser(iden)
return await user.popProfileValu(name, default=default)
[docs]
async def iterUserVars(self, iden):
user = await self.auth.reqUser(iden)
for item in user.vars.items():
yield item
await asyncio.sleep(0)
[docs]
async def iterUserProfInfo(self, iden):
user = await self.auth.reqUser(iden)
for item in user.profile.items():
yield item
await asyncio.sleep(0)
[docs]
async def getUserVarValu(self, iden, name, default=None):
user = await self.auth.reqUser(iden)
return user.vars.get(name, defv=default)
async def _setUserVarValuV0(self, iden, name, valu):
path = ('auth', 'users', iden, 'vars', name)
return await self.hive._push('hive:set', path, valu)
[docs]
async def setUserVarValu(self, iden, name, valu):
user = await self.auth.reqUser(iden)
return await user.setVarValu(name, valu)
async def _popUserVarValuV0(self, iden, name, default=None):
path = ('auth', 'users', iden, 'vars', name)
return await self.hive._push('hive:pop', path)
[docs]
async def popUserVarValu(self, iden, name, default=None):
user = await self.auth.reqUser(iden)
return await user.popVarValu(name, default=default)
[docs]
async def addUserRule(self, iden, rule, indx=None, gateiden=None):
user = await self.auth.reqUser(iden)
retn = await user.addRule(rule, indx=indx, gateiden=gateiden)
logger.info(f'Added rule={rule} on user {user.name} for gateiden={gateiden}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name,
rule=rule, gateiden=gateiden, status='MODIFY'))
return retn
[docs]
async def addRoleRule(self, iden, rule, indx=None, gateiden=None):
role = await self.auth.reqRole(iden)
retn = await role.addRule(rule, indx=indx, gateiden=gateiden)
logger.info(f'Added rule={rule} on role {role.name} for gateiden={gateiden}',
extra=await self.getLogExtra(target_role=role.iden, target_rolename=role.name,
rule=rule, gateiden=gateiden, status='MODIFY'))
return retn
[docs]
async def delUserRule(self, iden, rule, gateiden=None):
user = await self.auth.reqUser(iden)
logger.info(f'Removing rule={rule} on user {user.name} for gateiden={gateiden}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name,
rule=rule, gateiden=gateiden, status='MODIFY'))
return await user.delRule(rule, gateiden=gateiden)
[docs]
async def delRoleRule(self, iden, rule, gateiden=None):
role = await self.auth.reqRole(iden)
logger.info(f'Removing rule={rule} on role {role.name} for gateiden={gateiden}',
extra=await self.getLogExtra(target_role=role.iden, target_rolename=role.name,
rule=rule, gateiden=gateiden, status='MODIFY'))
return await role.delRule(rule, gateiden=gateiden)
[docs]
async def setUserRules(self, iden, rules, gateiden=None):
user = await self.auth.reqUser(iden)
await user.setRules(rules, gateiden=gateiden)
logger.info(f'Set user rules = {rules} on user {user.name} for gateiden={gateiden}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name,
rules=rules, gateiden=gateiden, status='MODIFY'))
[docs]
async def setRoleRules(self, iden, rules, gateiden=None):
role = await self.auth.reqRole(iden)
await role.setRules(rules, gateiden=gateiden)
logger.info(f'Set role rules = {rules} on role {role.name} for gateiden={gateiden}',
extra=await self.getLogExtra(target_role=role.iden, target_rolename=role.name,
rules=rules, gateiden=gateiden, status='MODIFY'))
[docs]
async def setRoleName(self, iden, name):
role = await self.auth.reqRole(iden)
oname = role.name
await role.setName(name)
logger.info(f'Set name={name} from {oname} on role iden={role.iden}',
extra=await self.getLogExtra(target_role=role.iden, target_rolename=role.name,
status='MODIFY'))
[docs]
async def setUserAdmin(self, iden, admin, gateiden=None):
user = await self.auth.reqUser(iden)
await user.setAdmin(admin, gateiden=gateiden)
logger.info(f'Set admin={admin} for {user.name} for gateiden={gateiden}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name,
gateiden=gateiden, status='MODIFY'))
[docs]
async def addUserRole(self, useriden, roleiden, indx=None):
user = await self.auth.reqUser(useriden)
role = await self.auth.reqRole(roleiden)
await user.grant(roleiden, indx=indx)
logger.info(f'Granted role {role.name} to user {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name,
target_role=role.iden, target_rolename=role.name,
status='MODIFY'))
[docs]
async def setUserRoles(self, useriden, roleidens):
user = await self.auth.reqUser(useriden)
await user.setRoles(roleidens)
logger.info(f'Set roleidens={roleidens} on user {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name,
roleidens=roleidens, status='MODIFY'))
[docs]
async def delUserRole(self, useriden, roleiden):
user = await self.auth.reqUser(useriden)
role = await self.auth.reqRole(roleiden)
await user.revoke(roleiden)
logger.info(f'Revoked role {role.name} from user {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name,
target_role=role.iden, target_rolename=role.name,
status='MODIFY'))
[docs]
async def addUser(self, name, passwd=None, email=None, iden=None):
user = await self.auth.addUser(name, passwd=passwd, email=email, iden=iden)
logger.info(f'Added user={name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name,
status='CREATE'))
return user.pack(packroles=True)
[docs]
async def delUser(self, iden):
user = await self.auth.reqUser(iden)
name = user.name
await self.auth.delUser(iden)
logger.info(f'Deleted user={name}',
extra=await self.getLogExtra(target_user=iden, target_username=name, status='DELETE'))
[docs]
async def addRole(self, name, iden=None):
role = await self.auth.addRole(name, iden=iden)
logger.info(f'Added role={name}',
extra=await self.getLogExtra(target_role=role.iden, target_rolename=role.name, status='CREATE'))
return role.pack()
[docs]
async def delRole(self, iden):
role = await self.auth.reqRole(iden)
name = role.name
await self.auth.delRole(iden)
logger.info(f'Deleted role={name}',
extra=await self.getLogExtra(target_role=iden, target_rolename=name, status='DELETE'))
[docs]
async def setUserEmail(self, useriden, email):
await self.auth.setUserInfo(useriden, 'email', email)
user = await self.auth.reqUser(useriden)
logger.info(f'Set email={email} for {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, status='MODIFY'))
[docs]
async def setUserName(self, useriden, name):
user = await self.auth.reqUser(useriden)
oname = user.name
await user.setName(name)
logger.info(f'Set name={name} from {oname} on user iden={user.iden}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, status='MODIFY'))
[docs]
async def setUserPasswd(self, iden, passwd):
user = await self.auth.reqUser(iden)
await user.setPasswd(passwd)
logger.info(f'Set password for {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, status='MODIFY'))
[docs]
async def genUserOnepass(self, iden, duration=600000):
user = await self.auth.reqUser(iden)
passwd = s_common.guid()
shadow = await s_passwd.getShadowV2(passwd=passwd)
now = s_common.now()
onepass = {'create': now, 'expires': s_common.now() + duration,
'shadow': shadow}
await self.auth.setUserInfo(iden, 'onepass', onepass)
logger.info(f'Issued one time password for {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, status='MODIFY'))
return passwd
[docs]
async def setUserLocked(self, iden, locked):
user = await self.auth.reqUser(iden)
await user.setLocked(locked)
logger.info(f'Set lock={locked} for user {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, status='MODIFY'))
[docs]
async def setUserArchived(self, iden, archived):
user = await self.auth.reqUser(iden)
await user.setArchived(archived)
logger.info(f'Set archive={archived} for user {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, status='MODIFY'))
[docs]
async def getUserDef(self, iden, packroles=True):
user = self.auth.user(iden)
if user is not None:
return user.pack(packroles=packroles)
[docs]
async def getAuthGate(self, iden):
gate = self.auth.getAuthGate(iden)
if gate is None:
return None
return gate.pack()
[docs]
async def getAuthGates(self):
return [g.pack() for g in self.auth.getAuthGates()]
[docs]
async def getRoleDef(self, iden):
role = self.auth.role(iden)
if role is not None:
return role.pack()
[docs]
async def getUserDefByName(self, name):
user = await self.auth.getUserByName(name)
if user is not None:
return user.pack(packroles=True)
[docs]
async def getRoleDefByName(self, name):
role = await self.auth.getRoleByName(name)
if role is not None:
return role.pack()
[docs]
async def getUserDefs(self):
return [u.pack(packroles=True) for u in self.auth.users()]
[docs]
async def getRoleDefs(self):
return [r.pack() for r in self.auth.roles()]
[docs]
async def getAuthUsers(self, archived=False):
return [u.pack() for u in self.auth.users() if archived or not u.info.get('archived')]
[docs]
async def getAuthRoles(self):
return [r.pack() for r in self.auth.roles()]
[docs]
async def reqGateKeys(self, gatekeys):
for useriden, perm, gateiden in gatekeys:
(await self.auth.reqUser(useriden)).confirm(perm, gateiden=gateiden)
[docs]
async def feedBeholder(self, name, info, gates=None, perms=None):
'''
Feed a named event onto the ``cell:beholder`` message bus that will sent to any listeners.
Args:
info (dict): An information dictionary to be sent to any consumers.
gates (list): List of gate idens, whose details will be added to the outbound message(s).
perms (list): List of permission names, whose details will be added to the outbound message(s).
Returns:
None
'''
kwargs = {
'event': name,
'offset': await self.nexsroot.index(),
'info': copy.deepcopy(info),
}
if gates:
g = []
for gate in gates:
authgate = await self.getAuthGate(gate)
if authgate is not None:
g.append(authgate)
kwargs['gates'] = g
if perms:
p = []
for perm in perms:
permdef = self.getPermDef(perm)
if permdef is not None:
p.append(permdef)
kwargs['perms'] = p
await self.fire('cell:beholder', **kwargs)
[docs]
@contextlib.asynccontextmanager
async def beholder(self):
async with await s_queue.Window.anit(maxsize=10000) as wind:
async def onEvent(mesg):
await wind.put(mesg[1])
with self.onWith('cell:beholder', onEvent):
yield wind
[docs]
async def behold(self):
async with self.beholder() as wind:
async for mesg in wind:
yield mesg
[docs]
async def dyniter(self, iden, todo, gatekeys=()):
await self.reqGateKeys(gatekeys)
item = self.dynitems.get(iden)
if item is None:
raise s_exc.NoSuchIden(mesg=f'No dynitem for iden={iden}', iden=iden)
name, args, kwargs = todo
meth = getattr(item, name)
async for item in meth(*args, **kwargs):
yield item
[docs]
async def dyncall(self, iden, todo, gatekeys=()):
await self.reqGateKeys(gatekeys)
item = self.dynitems.get(iden)
if item is None:
raise s_exc.NoSuchIden(mesg=f'No dynitem for iden={iden}', iden=iden)
name, args, kwargs = todo
meth = getattr(item, name)
return await s_coro.ornot(meth, *args, **kwargs)
[docs]
async def getConfOpt(self, name):
return self.conf.get(name)
[docs]
def getUserName(self, iden, defv='<unknown>'):
'''
Translate the user iden to a user name.
'''
# since this pattern is so common, utilitizing...
user = self.auth.user(iden)
if user is None:
return defv
return user.name
[docs]
async def hasHttpSess(self, iden):
return self.sessstor.has(iden)
[docs]
async def genHttpSess(self, iden):
# TODO age out http sessions
sess = self.sessions.get(iden)
if sess is not None:
return sess
info = await self.getHttpSessDict(iden)
if info is None:
info = await self.addHttpSess(iden, {'created': s_common.now()})
sess = await s_httpapi.Sess.anit(self, iden, info)
self.sessions[iden] = sess
return sess
[docs]
async def getHttpSessDict(self, iden):
info = await self.sessstor.dict(iden)
if info:
return info
[docs]
@s_nexus.Pusher.onPushAuto('http:sess:add')
async def addHttpSess(self, iden, info):
for name, valu in info.items():
self.sessstor.set(iden, name, valu)
return info
[docs]
@s_nexus.Pusher.onPushAuto('http:sess:del')
async def delHttpSess(self, iden):
await self.sessstor.del_(iden)
sess = self.sessions.pop(iden, None)
if sess:
sess.info.clear()
self.schedCoro(sess.fini())
[docs]
@s_nexus.Pusher.onPushAuto('http:sess:set')
async def setHttpSessInfo(self, iden, name, valu):
self.sessstor.set(iden, name, valu)
sess = self.sessions.get(iden)
if sess is not None:
sess.info[name] = valu
[docs]
@s_nexus.Pusher.onPushAuto('http:sess:update')
async def updateHttpSessInfo(self, iden, vals: dict):
for name, valu in vals.items():
s_msgpack.isok(valu)
for name, valu in vals.items():
self.sessstor.set(iden, name, valu)
sess = self.sessions.get(iden)
if sess is not None:
sess.info.update(vals)
[docs]
@contextlib.contextmanager
def getTempDir(self):
tdir = s_common.gendir(self.dirn, 'tmp')
with s_common.getTempDir(dirn=tdir) as dirn:
yield dirn
[docs]
async def addHttpsPort(self, port, host='0.0.0.0', sslctx=None):
'''
Add a HTTPS listener to the Cell.
Args:
port (int): The listening port to bind.
host (str): The listening host.
sslctx (ssl.SSLContext): An externally managed SSL Context.
Note:
If the SSL context is not provided, the Cell will assume it
manages the SSL context it creates for a given listener and will
add a reload handler named https:certs to enabled reloading the
SSL certificates from disk.
'''
addr = socket.gethostbyname(host)
managed_ctx = False
if sslctx is None:
managed_ctx = True
pkeypath = os.path.join(self.dirn, 'sslkey.pem')
certpath = os.path.join(self.dirn, 'sslcert.pem')
if not os.path.isfile(certpath):
logger.warning('NO CERTIFICATE FOUND! generating self-signed certificate.')
tdir = s_common.gendir(self.dirn, 'tmp')
with s_common.getTempDir(dirn=tdir) as dirn:
cdir = s_certdir.CertDir(path=(dirn,))
pkey, cert = cdir.genHostCert(self.getCellType())
cdir.savePkeyPem(pkey, pkeypath)
cdir.saveCertPem(cert, certpath)
sslctx = self.initSslCtx(certpath, pkeypath)
kwargs = {
'xheaders': self.conf.req('https:parse:proxy:remoteip')
}
serv = self.wapp.listen(port, address=addr, ssl_options=sslctx, **kwargs)
self.httpds.append(serv)
lhost, lport = list(serv._sockets.values())[0].getsockname()
if managed_ctx:
# If the Cell is managing the SSLCtx, then we register a reload handler
# for it which reloads the pkey / cert.
def reload():
sslctx.load_cert_chain(certpath, pkeypath)
return True
self.addReloadableSystem('https:certs', reload)
self.https_listeners.append({'host': lhost, 'port': lport})
return (lhost, lport)
[docs]
def initSslCtx(self, certpath, keypath):
sslctx = s_certdir.getServerSSLContext()
if not os.path.isfile(keypath):
raise s_exc.NoSuchFile(mesg=f'Missing TLS keypath {keypath}', path=keypath)
if not os.path.isfile(certpath):
raise s_exc.NoSuchFile(mesg=f'Missing TLS certpath {certpath}', path=certpath)
sslctx.load_cert_chain(certpath, keypath)
return sslctx
def _log_web_request(self, handler: s_httpapi.Handler) -> None:
# Derived from https://github.com/tornadoweb/tornado/blob/v6.2.0/tornado/web.py#L2253
status = handler.get_status()
if status < 400:
log_method = t_log.access_log.info
elif status < 500:
log_method = t_log.access_log.warning
else:
log_method = t_log.access_log.error
request_time = 1000.0 * handler.request.request_time()
user = None
username = None
uri = handler.request.uri
remote_ip = handler.request.remote_ip
enfo = {'http_status': status,
'uri': uri,
'remoteip': remote_ip,
}
extra = {'synapse': enfo}
# It is possible that a Cell implementor may register handlers which
# do not derive from our Handler class, so we have to handle that.
if hasattr(handler, 'web_useriden') and handler.web_useriden:
user = handler.web_useriden
enfo['user'] = user
if hasattr(handler, 'web_username') and handler.web_username:
username = handler.web_username
enfo['username'] = username
if user:
mesg = f'{status} {handler.request.method} {uri} ({remote_ip}) user={user} ({username}) {request_time:.2f}ms'
else:
mesg = f'{status} {handler.request.method} {uri} ({remote_ip}) {request_time:.2f}ms'
log_method(mesg, extra=extra)
async def _getCellHttpOpts(self):
# Generate/Load a Cookie Secret
secpath = os.path.join(self.dirn, 'cookie.secret')
if not os.path.isfile(secpath):
with s_common.genfile(secpath) as fd:
fd.write(s_common.guid().encode('utf8'))
with s_common.getfile(secpath) as fd:
secret = fd.read().decode('utf8')
return {
'cookie_secret': secret,
'log_function': self._log_web_request,
'websocket_ping_interval': 10
}
async def _initCellHttp(self):
self.httpds = []
self.sessstor = s_lmdbslab.GuidStor(self.slab, 'http:sess')
async def fini():
[await s.fini() for s in self.sessions.values()]
for http in self.httpds:
http.stop()
self.onfini(fini)
opts = await self._getCellHttpOpts()
self.wapp = t_web.Application(**opts)
self._initCellHttpApis()
def _initCellHttpApis(self):
self.addHttpApi('/robots.txt', s_httpapi.RobotHandler, {'cell': self})
self.addHttpApi('/api/v1/login', s_httpapi.LoginV1, {'cell': self})
self.addHttpApi('/api/v1/logout', s_httpapi.LogoutV1, {'cell': self})
self.addHttpApi('/api/v1/active', s_httpapi.ActiveV1, {'cell': self})
self.addHttpApi('/api/v1/healthcheck', s_httpapi.HealthCheckV1, {'cell': self})
self.addHttpApi('/api/v1/auth/users', s_httpapi.AuthUsersV1, {'cell': self})
self.addHttpApi('/api/v1/auth/roles', s_httpapi.AuthRolesV1, {'cell': self})
self.addHttpApi('/api/v1/auth/adduser', s_httpapi.AuthAddUserV1, {'cell': self})
self.addHttpApi('/api/v1/auth/addrole', s_httpapi.AuthAddRoleV1, {'cell': self})
self.addHttpApi('/api/v1/auth/delrole', s_httpapi.AuthDelRoleV1, {'cell': self})
self.addHttpApi('/api/v1/auth/user/(.*)', s_httpapi.AuthUserV1, {'cell': self})
self.addHttpApi('/api/v1/auth/role/(.*)', s_httpapi.AuthRoleV1, {'cell': self})
self.addHttpApi('/api/v1/auth/password/(.*)', s_httpapi.AuthUserPasswdV1, {'cell': self})
self.addHttpApi('/api/v1/auth/grant', s_httpapi.AuthGrantV1, {'cell': self})
self.addHttpApi('/api/v1/auth/revoke', s_httpapi.AuthRevokeV1, {'cell': self})
self.addHttpApi('/api/v1/auth/onepass/issue', s_httpapi.OnePassIssueV1, {'cell': self})
self.addHttpApi('/api/v1/behold', s_httpapi.BeholdSockV1, {'cell': self})
[docs]
def addHttpApi(self, path, ctor, info):
self.wapp.add_handlers('.*', (
(path, ctor, info),
))
def _initCertDir(self):
self.certpath = s_common.gendir(self.dirn, 'certs')
# add our cert path to the global resolver
s_certdir.addCertPath(self.certpath)
syncerts = s_data.path('certs')
self.certdir = s_certdir.CertDir(path=(self.certpath, syncerts))
def fini():
s_certdir.delCertPath(self.certpath)
self.onfini(fini)
async def _initCellDmon(self):
ahainfo = {'name': self.ahasvcname}
self.dmon = await s_daemon.Daemon.anit(ahainfo=ahainfo)
self.dmon.share('*', self)
self.onfini(self.dmon.fini)
async def _initCellHive(self):
db = self.slab.initdb('hive')
hive = await s_hive.SlabHive.anit(self.slab, db=db, nexsroot=self.getCellNexsRoot(), cell=self)
self.onfini(hive)
return hive
async def _initSlabFile(self, path, readonly=False, ephemeral=False):
slab = await s_lmdbslab.Slab.anit(path, map_size=SLAB_MAP_SIZE, readonly=readonly)
slab.addResizeCallback(self.checkFreeSpace)
fini = slab.fini
if ephemeral:
fini = slab
self.onfini(fini)
return slab
async def _initCellSlab(self, readonly=False):
s_common.gendir(self.dirn, 'slabs')
path = os.path.join(self.dirn, 'slabs', 'cell.lmdb')
if not os.path.exists(path) and readonly:
logger.warning('Creating a slab for a readonly cell.')
_slab = await s_lmdbslab.Slab.anit(path, map_size=SLAB_MAP_SIZE)
_slab.initdb('hive')
await _slab.fini()
self.slab = await self._initSlabFile(path)
async def _initCellAuth(self):
# Add callbacks
self.on('user:del', self._onUserDelEvnt)
authctor = self.conf.get('auth:ctor')
if authctor is not None:
s_common.deprecated('auth:ctor cell config option', curv='2.157.0')
ctor = s_dyndeps.getDynLocal(authctor)
return await ctor(self)
maxusers = self.conf.get('max:users')
policy = self.conf.get('auth:passwd:policy')
seed = s_common.guid((self.iden, 'hive', 'auth'))
auth = await s_auth.Auth.anit(
self.slab,
'auth',
seed=seed,
nexsroot=self.getCellNexsRoot(),
maxusers=maxusers,
policy=policy
)
auth.link(self.dist)
def finilink():
auth.unlink(self.dist)
self.onfini(finilink)
self.onfini(auth.fini)
return auth
[docs]
def getCellNexsRoot(self):
# the "cell scope" nexusroot only exists if we are *not* embedded
# (aka we dont have a self.cellparent)
if self.cellparent is None:
return self.nexsroot
async def _initInauguralConfig(self):
if self.inaugural:
icfg = self.conf.get('inaugural')
if icfg is not None:
for rnfo in icfg.get('roles', ()):
name = rnfo.get('name')
logger.debug(f'Adding inaugural role {name}')
iden = s_common.guid((self.iden, 'auth', 'role', name))
role = await self.auth.addRole(name, iden) # type: s_auth.Role
for rule in rnfo.get('rules', ()):
await role.addRule(rule)
for unfo in icfg.get('users', ()):
name = unfo.get('name')
email = unfo.get('email')
iden = s_common.guid((self.iden, 'auth', 'user', name))
logger.debug(f'Adding inaugural user {name}')
user = await self.auth.addUser(name, email=email, iden=iden) # type: s_auth.User
if unfo.get('admin'):
await user.setAdmin(True)
for rolename in unfo.get('roles', ()):
role = await self.auth.reqRoleByName(rolename)
await user.grant(role.iden)
for rule in unfo.get('rules', ()):
await user.addRule(rule)
[docs]
@contextlib.asynccontextmanager
async def getLocalProxy(self, share='*', user='root'):
url = self.getLocalUrl(share=share, user=user)
prox = await s_telepath.openurl(url)
yield prox
[docs]
def getLocalUrl(self, share='*', user='root'):
return f'cell://{user}@{self.dirn}:{share}'
def _initCellConf(self, conf):
'''
Initialize a cell config during __anit__.
Args:
conf (s_config.Config, dict): A config object or dictionary.
Notes:
This does not pull environment variables or opts.
This only pulls cell.yaml / cell.mods.yaml data in the event
we got a dictionary
Returns:
s_config.Config: A config object.
'''
# if they hand in a dict, assume we are not the main/only one
if isinstance(conf, dict):
conf = s_config.Config.getConfFromCell(self, conf=conf)
path = s_common.genpath(self.dirn, 'cell.yaml')
conf.setConfFromFile(path)
mods_path = s_common.genpath(self.dirn, 'cell.mods.yaml')
conf.setConfFromFile(mods_path, force=True)
conf.reqConfValid() # Populate defaults
return conf
def _loadCellYaml(self, *path):
path = os.path.join(self.dirn, *path)
if os.path.isfile(path):
logger.debug('Loading file from [%s]', path)
return s_common.yamlload(path)
return {}
def _hasEasyPerm(self, item, user, level):
if level > PERM_ADMIN or level < PERM_DENY:
raise s_exc.BadArg(mesg=f'Invalid permission level: {level} (must be <= 3 and >= 0)')
if user.isAdmin():
return True
userlevel = item['permissions']['users'].get(user.iden)
if userlevel is not None:
if userlevel == 0:
return False
elif userlevel >= level:
return True
roleperms = item['permissions']['roles']
for role in user.getRoles():
rolelevel = roleperms.get(role.iden)
if rolelevel is not None:
if rolelevel == 0:
return False
elif rolelevel >= level:
return True
default = item['permissions'].get('default', PERM_READ)
if level <= default:
return True
return False
def _reqEasyPerm(self, item, user, level, mesg=None):
'''
Require the user (or an assigned role) to have the given permission
level on the specified item. The item must implement the "easy perm"
convention by having a key named "permissions" which adheres to the
easyPermSchema definition.
NOTE: By default a user will only be denied read access if they
(or an assigned role) has PERM_DENY assigned.
'''
if self._hasEasyPerm(item, user, level):
return
if mesg is None:
permname = permnames.get(level)
mesg = f'User ({user.name}) has insufficient permissions (requires: {permname}).'
raise s_exc.AuthDeny(mesg=mesg, user=user.iden, username=user.name)
async def _setEasyPerm(self, item, scope, iden, level):
'''
Set a user or role permission level within an object that uses the "easy perm"
convention. If level is None, permissions are removed.
'''
if scope not in ('users', 'roles'):
raise s_exc.BadArg(mesg=f'Invalid permissions scope: {scope} (must be "users" or "roles")')
if level is not None and (level > PERM_ADMIN or level < PERM_DENY):
raise s_exc.BadArg(mesg=f'Invalid permission level: {level} (must be <= 3 and >= 0, or None)')
if scope == 'users':
await self.auth.reqUser(iden)
elif scope == 'roles':
await self.auth.reqRole(iden)
perms = item['permissions'].get(scope)
if level is None:
perms.pop(iden, None)
else:
perms[iden] = level
def _initEasyPerm(self, item, default=PERM_READ):
'''
Ensure that the given object has populated the "easy perm" convention.
'''
if default > PERM_ADMIN or default < PERM_DENY:
mesg = f'Invalid permission level: {default} (must be <= {PERM_ADMIN} and >= {PERM_DENY})'
raise s_exc.BadArg(mesg=mesg)
item.setdefault('permissions', {})
item['permissions'].setdefault('users', {})
item['permissions'].setdefault('roles', {})
item['permissions'].setdefault('default', default)
[docs]
async def getTeleApi(self, link, mesg, path):
# if auth is disabled or it's a unix socket, they're root.
if link.get('unix'):
name = 'root'
auth = mesg[1].get('auth')
if auth is not None:
name, info = auth
# By design, unix sockets can auth as any user.
user = await self.auth.getUserByName(name)
if user is None:
raise s_exc.NoSuchUser(username=name, mesg=f'No such user: {name}.')
else:
user = await self._getCellUser(link, mesg)
return await self.getCellApi(link, user, path)
[docs]
async def getCellApi(self, link, user, path):
'''
Get an instance of the telepath Client object for a given user, link and path.
Args:
link (s_link.Link): The link object.
user (s_auth.User): The heavy user object.
path (str): The path requested.
Notes:
This defaults to the self.cellapi class. Implementors may override the
default class attribute for cellapi to share a different interface.
Returns:
object: The shared object for this cell.
'''
return await self.cellapi.anit(self, link, user)
async def _getSpawnLogConf(self):
conf = self.conf.get('_log_conf')
if conf:
conf = conf.copy()
else:
conf = s_common._getLogConfFromEnv()
conf['log_setup'] = False
return conf
[docs]
def modCellConf(self, conf):
'''
Modify the Cell's ondisk configuration overrides file and runtime configuration.
Args:
conf (dict): A dictionary of items to set.
Notes:
This does require the data being set to be schema valid.
Returns:
None.
'''
for key, valu in conf.items():
self.conf.reqKeyValid(key, valu)
logger.info(f'Setting cell config override for [{key}]')
self.conf.update(conf)
s_common.yamlmod(conf, self.dirn, 'cell.mods.yaml')
[docs]
def popCellConf(self, name):
'''
Remove a key from the Cell's ondisk configuration overrides file and
runtime configuration.
Args:
name (str): Name of the value to remove.
Notes:
This does **not** modify the cell.yaml file.
This does re-validate the configuration after removing the value,
so if the value removed had a default populated by schema, that
default would be reset.
Returns:
None
'''
self.conf.pop(name, None)
self.conf.reqConfValid()
s_common.yamlpop(name, self.dirn, 'cell.mods.yaml')
logger.info(f'Removed cell config override for [{name}]')
[docs]
@classmethod
def getCellType(cls):
return cls.__name__.lower()
[docs]
@classmethod
def getEnvPrefix(cls):
'''Get a list of envar prefixes for config resolution.'''
return (f'SYN_{cls.__name__.upper()}', )
[docs]
def getCellIden(self):
return self.iden
[docs]
async def getCellRunId(self):
return self.runid
[docs]
@classmethod
def initCellConf(cls, conf=None):
'''
Create a Config object for the Cell.
Args:
conf (s_config.Config): An optional config structure. This has _opts_data taken from it.
Notes:
The Config object has a ``envar_prefix`` set according to the results of ``cls.getEnvPrefix()``.
Returns:
s_config.Config: A Config helper object.
'''
prefixes = cls.getEnvPrefix()
schema = s_config.getJsSchema(cls.confbase, cls.confdefs)
config = s_config.Config(schema, envar_prefixes=prefixes)
if conf:
config._opts_data = conf._opts_data
return config
[docs]
@classmethod
def getArgParser(cls, conf=None):
'''
Get an ``argparse.ArgumentParser`` for the Cell.
Args:
conf (s_config.Config): Optional, a Config object which
Notes:
Boot time configuration data is placed in the argument group called ``config``.
This adds default ``dirn``, ``--telepath``, ``--https`` and ``--name`` arguements to the argparser instance.
Configuration values which have the ``hideconf`` or ``hidecmdl`` value set to True are not added to the
argparser instance.
Returns:
argparse.ArgumentParser: A ArgumentParser for the Cell.
'''
name = cls.getCellType()
prefix = cls.getEnvPrefix()[0]
pars = argparse.ArgumentParser(prog=name)
pars.add_argument('dirn', help=f'The storage directory for the {name} service.')
pars.add_argument('--log-level', default='INFO', choices=list(s_const.LOG_LEVEL_CHOICES.keys()),
help='Specify the Python logging log level.', type=str.upper)
pars.add_argument('--structured-logging', default=False, action='store_true',
help='Use structured logging.')
telendef = None
telepdef = 'tcp://0.0.0.0:27492'
httpsdef = 4443
telenvar = '_'.join((prefix, 'NAME'))
telepvar = '_'.join((prefix, 'TELEPATH'))
httpsvar = '_'.join((prefix, 'HTTPS'))
telen = os.getenv(telenvar, telendef)
telep = os.getenv(telepvar, telepdef)
https = os.getenv(httpsvar, httpsdef)
pars.add_argument('--telepath', default=telep, type=str,
help=f'The telepath URL to listen on. This defaults to {telepdef}, and may be '
f'also be overridden by the {telepvar} environment variable.')
pars.add_argument('--https', default=https, type=int,
help=f'The port to bind for the HTTPS/REST API. This defaults to {httpsdef}, '
f'and may be also be overridden by the {httpsvar} environment variable.')
pars.add_argument('--name', type=str, default=telen,
help=f'The (optional) additional name to share the {name} as. This defaults to '
f'{telendef}, and may be also be overridden by the {telenvar} environment'
f' variable.')
if conf is not None:
args = conf.getArgParseArgs()
if args:
pgrp = pars.add_argument_group('config', 'Configuration arguments.')
for (argname, arginfo) in args:
pgrp.add_argument(argname, **arginfo)
return pars
async def _initCellBoot(self):
# NOTE: best hook point for custom provisioning
isok, pnfo = await self._bootCellProv()
# check this before we setup loadTeleCell()
if not self._mustBootMirror():
return
if not isok:
# The way that we get to this requires the following states to be true:
# 1. self.dirn/cell.guid file is NOT present in the service directory.
# 2. mirror config is present.
# 3. aha:provision config is not set OR the aha:provision guid matches the self.dirn/prov.done file.
mesg = 'Service has been configured to boot from an upstream mirror, but has entered into an invalid ' \
'state. This may have been caused by manipulation of the service storage or an error during a ' \
f'backup / restore operation. {pnfo.get("mesg")}'
raise s_exc.FatalErr(mesg=mesg)
async with s_telepath.loadTeleCell(self.dirn):
await self._bootCellMirror(pnfo)
@classmethod
async def _initBootRestore(cls, dirn):
env = 'SYN_RESTORE_HTTPS_URL'
rurl = os.getenv(env, None)
if rurl is None:
return
dirn = s_common.gendir(dirn)
# restore.done - Allow an existing URL to be left in the configuration
# for a service without issues.
doneiden = None
donepath = s_common.genpath(dirn, 'restore.done')
if os.path.isfile(donepath):
with s_common.genfile(donepath) as fd:
doneiden = fd.read().decode().strip()
rurliden = s_common.guid(rurl)
if doneiden == rurliden:
logger.warning(f'restore.done matched value from {env}. It is recommended to remove the {env} value.')
return
clean_url = s_urlhelp.sanitizeUrl(rurl).rsplit('?', 1)[0]
logger.warning(f'Restoring {cls.getCellType()} from {env}={clean_url}')
# First we clear any files out of the directory though. This avoids the possibility
# of a restore that is potentially mixed.
efiles = os.listdir(dirn)
for fn in efiles:
fp = os.path.join(dirn, fn)
if os.path.isfile(fp):
logger.warning(f'Removing existing file: {fp}')
os.unlink(fp)
continue
if os.path.isdir(fp):
logger.warning(f'Removing existing directory: {fp}')
shutil.rmtree(fp)
continue
logger.warning(f'Unhandled existing file: {fp}') # pragma: no cover
# Setup get args
insecure_marker = 'https+insecure://'
kwargs = {}
if rurl.startswith(insecure_marker):
logger.warning(f'Disabling SSL verification for restore request.')
kwargs['ssl'] = False
rurl = 'https://' + rurl[len(insecure_marker):]
tmppath = s_common.gendir(dirn, 'tmp')
tarpath = s_common.genpath(tmppath, f'restore_{rurliden}.tgz')
try:
with s_common.genfile(tarpath) as fd:
# Leave a 60 second timeout check for the connection and reads.
# Disable total timeout
timeout = aiohttp.client.ClientTimeout(
total=None,
connect=60,
sock_read=60,
sock_connect=60,
)
async with aiohttp.client.ClientSession(timeout=timeout) as sess:
async with sess.get(rurl, **kwargs) as resp:
resp.raise_for_status()
content_length = int(resp.headers.get('content-length', 0))
if content_length > 100:
logger.warning(f'Downloading {content_length/s_const.megabyte:.3f} MB of data.')
pvals = [int((content_length * 0.01) * i) for i in range(1, 100)]
else: # pragma: no cover
logger.warning(f'Odd content-length encountered: {content_length}')
pvals = []
csize = s_const.kibibyte * 64 # default read chunksize for ClientSession
tsize = 0
async for chunk in resp.content.iter_chunked(csize):
fd.write(chunk)
tsize = tsize + len(chunk)
if pvals and tsize > pvals[0]:
pvals.pop(0)
percentage = (tsize / content_length) * 100
logger.warning(f'Downloaded {tsize/s_const.megabyte:.3f} MB, {percentage:.3f}%')
logger.warning(f'Extracting {tarpath} to {dirn}')
with tarfile.open(tarpath) as tgz:
for memb in tgz.getmembers():
if memb.name.find('/') == -1:
continue
memb.name = memb.name.split('/', 1)[1]
logger.warning(f'Extracting {memb.name}')
tgz.extract(memb, dirn)
# and record the rurliden
with s_common.genfile(donepath) as fd:
fd.truncate(0)
fd.seek(0)
fd.write(rurliden.encode())
except asyncio.CancelledError: # pragma: no cover
raise
except Exception: # pragma: no cover
logger.exception('Failed to restore cell from URL.')
raise
else:
logger.warning('Restored service from URL')
return
finally:
if os.path.isfile(tarpath):
os.unlink(tarpath)
async def _bootCellProv(self):
provurl = self.conf.get('aha:provision')
if provurl is None:
return False, {'mesg': 'No aha:provision configuration has been provided to allow the service to '
'bootstrap via AHA.'}
doneiden = None
donepath = s_common.genpath(self.dirn, 'prov.done')
if os.path.isfile(donepath):
with s_common.genfile(donepath) as fd:
doneiden = fd.read().decode().strip()
urlinfo = s_telepath.chopurl(provurl)
providen = urlinfo.get('path').strip('/')
if doneiden == providen:
return False, {'mesg': f'The aha:provision URL guid matches the service prov.done guid, '
f'aha:provision={provurl}'}
logger.info(f'Provisioning {self.getCellType()} from AHA service.')
certdir = s_certdir.CertDir(path=(s_common.gendir(self.dirn, 'certs'),))
async with await s_telepath.openurl(provurl) as prov:
provinfo = await prov.getProvInfo()
provconf = provinfo.get('conf', {})
ahauser = provconf.get('aha:user')
ahaname = provconf.get('aha:name')
ahanetw = provconf.get('aha:network')
_crt = certdir.getCaCertPath(ahanetw)
if _crt:
logger.debug(f'Removing existing CA crt: {_crt}')
os.unlink(_crt)
certdir.saveCaCertByts(await prov.getCaCert())
await self._bootProvConf(provconf)
hostname = f'{ahaname}.{ahanetw}'
_crt = certdir.getHostCertPath(hostname)
if _crt:
logger.debug(f'Removing existing host crt {_crt}')
os.unlink(_crt)
_kp = certdir.getHostKeyPath(hostname)
if _kp:
logger.debug(f'Removing existing host key {_kp}')
os.unlink(_kp)
_csr = certdir.getHostCsrPath(hostname)
if _csr:
logger.debug(f'Removing existing host csr {_csr}')
os.unlink(_csr)
hostcsr = certdir.genHostCsr(hostname)
certdir.saveHostCertByts(await prov.signHostCsr(hostcsr))
userfull = f'{ahauser}@{ahanetw}'
_crt = certdir.getUserCertPath(userfull)
if _crt:
logger.debug(f'Removing existing user crt {_crt}')
os.unlink(_crt)
_kp = certdir.getUserKeyPath(userfull)
if _kp:
logger.debug(f'Removing existing user key {_kp}')
os.unlink(_kp)
_csr = certdir.getUserCsrPath(userfull)
if _csr:
logger.debug(f'Removing existing user csr {_csr}')
os.unlink(_csr)
usercsr = certdir.genUserCsr(userfull)
certdir.saveUserCertByts(await prov.signUserCsr(usercsr))
with s_common.genfile(self.dirn, 'prov.done') as fd:
fd.write(providen.encode())
logger.info(f'Done provisioning {self.getCellType()} AHA service.')
return True, {'conf': provconf, 'iden': providen}
async def _bootProvConf(self, provconf):
'''
Get a configuration object for booting the cell from a AHA configuration.
Args:
provconf (dict): A dictionary containing provisioning config data from AHA.
Notes:
The cell.yaml will have the "mirror" key removed from it.
The cell.yaml will be modified with data from provconf.
The cell.mods.yaml will have the "mirror" key removed from it.
The cell.mods.yaml will have any keys in the prov conf removed from it.
This sets the runtime configuration as well.
Returns:
s_config.Config: The new config object to be used.
'''
# replace our runtime config with the updated config with provconf data
new_conf = self.initCellConf(self.conf)
new_conf.setdefault('_log_conf', await self._getSpawnLogConf())
# Load any opts we have and environment variables.
new_conf.setConfFromOpts()
new_conf.setConfFromEnvs()
# Validate provconf, and insert it into cell.yaml
for name, valu in provconf.items():
new_conf.reqKeyValid(name, valu)
cell_path = s_common.genpath(self.dirn, 'cell.yaml')
# Slice the mirror option out of the cell.yaml file. This avoids
# a situation where restoring a backup from a cell which was provisioned
# as a mirror is then provisioned as a leader and then has an extra
# cell config value which conflicts with the new provconf.
s_common.yamlpop('mirror', cell_path)
# Inject the provconf value into the cell configuration
s_common.yamlmod(provconf, cell_path)
# load cell.yaml, still preferring actual config data from opts/envs.
new_conf.setConfFromFile(cell_path)
# Remove any keys from overrides that were set from provconf
# then load the file
mods_path = s_common.genpath(self.dirn, 'cell.mods.yaml')
for key in provconf:
s_common.yamlpop(key, mods_path)
# Slice the mirror option out of the cell.mods.yaml file. This avoids
# a situation where restoring a backup from a cell which was demoted
# from a leader to a follower has a config value which conflicts with
# the new provconf.
s_common.yamlpop('mirror', mods_path)
new_conf.setConfFromFile(mods_path, force=True)
# Ensure defaults are set
new_conf.reqConfValid()
self.conf = new_conf
return new_conf
def _mustBootMirror(self):
path = s_common.genpath(self.dirn, 'cell.guid')
if os.path.isfile(path):
return False
murl = self.conf.get('mirror')
if murl is None:
return False
return True
[docs]
async def readyToMirror(self):
if not self.conf.get('nexslog:en'):
self.modCellConf({'nexslog:en': True})
await self.nexsroot.enNexsLog()
await self.sync()
async def _initCloneCell(self, proxy):
tarpath = s_common.genpath(self.dirn, 'tmp', 'bootstrap.tgz')
try:
await proxy.readyToMirror()
with s_common.genfile(tarpath) as fd:
async for byts in proxy.iterNewBackupArchive(remove=True):
fd.write(byts)
with tarfile.open(tarpath) as tgz:
for memb in tgz.getmembers():
if memb.name.find('/') == -1:
continue
memb.name = memb.name.split('/', 1)[1]
tgz.extract(memb, self.dirn)
finally:
if os.path.isfile(tarpath):
os.unlink(tarpath)
async def _bootCellMirror(self, pnfo):
# this function must assume almost nothing is initialized
# but that's ok since it will only run rarely.
# It assumes it has a tuple of (provisioning configuration, provisioning iden) available
murl = self.conf.req('mirror')
provconf, providen = pnfo.get('conf'), pnfo.get('iden')
logger.warning(f'Bootstrap mirror from: {murl} (this could take a while!)')
async with await s_telepath.openurl(murl) as proxy:
await self._initCloneCell(proxy)
# Remove aha:provision from cell.yaml if it exists and the iden differs.
mnfo = s_common.yamlload(self.dirn, 'cell.yaml')
if mnfo:
provurl = mnfo.get('aha:provision', None)
if provurl:
murlinfo = s_telepath.chopurl(provurl)
miden = murlinfo.get('path').strip('/')
if miden != providen:
s_common.yamlpop('aha:provision', self.dirn, 'cell.yaml')
logger.debug('Removed aha:provision from cell.yaml')
await self._bootProvConf(provconf)
# Overwrite the prov.done file that may have come from
# the upstream backup.
with s_common.genfile(self.dirn, 'prov.done') as fd:
fd.truncate(0)
fd.write(providen.encode())
logger.warning(f'Bootstrap mirror from: {murl} DONE!')
[docs]
async def getMirrorUrls(self):
if self.ahaclient is None:
raise s_exc.BadConfValu(mesg='Enumerating mirror URLs is only supported when AHA is configured')
await self.ahaclient.waitready()
proxy = await self.ahaclient.proxy(timeout=5)
mirrors = await proxy.getAhaSvcMirrors(self.ahasvcname)
if mirrors is None:
mesg = 'Service must be configured with AHA to enumerate mirror URLs'
raise s_exc.NoSuchName(mesg=mesg, name=self.ahasvcname)
return [f'aha://{svc["svcname"]}.{svc["svcnetw"]}' for svc in mirrors]
[docs]
@classmethod
async def initFromArgv(cls, argv, outp=None):
'''
Cell launcher which does automatic argument parsing, environment variable resolution and Cell creation.
Args:
argv (list): A list of command line arguments to launch the Cell with.
outp (s_ouput.OutPut): Optional, an output object. No longer used in the default implementation.
Notes:
This does the following items:
- Create a Config object from the Cell class.
- Creates an Argument Parser from the Cell class and Config object.
- Parses the provided arguments.
- Loads configuration data from the parsed options and environment variables.
- Sets logging for the process.
- Creates the Cell from the Cell Ctor.
- Adds a Telepath listener, HTTPs port listeners and Telepath share names.
- Returns the Cell.
Returns:
Cell: This returns an instance of the Cell.
'''
conf = cls.initCellConf()
pars = cls.getArgParser(conf=conf)
opts = pars.parse_args(argv)
path = s_common.genpath(opts.dirn, 'cell.yaml')
mods_path = s_common.genpath(opts.dirn, 'cell.mods.yaml')
logconf = s_common.setlogging(logger, defval=opts.log_level,
structlog=opts.structured_logging)
logger.info(f'Starting {cls.getCellType()} version {cls.VERSTRING}, Synapse version: {s_version.verstring}',
extra={'synapse': {'svc_type': cls.getCellType(), 'svc_version': cls.VERSTRING,
'synapse_version': s_version.verstring}})
await cls._initBootRestore(opts.dirn)
try:
conf.setdefault('_log_conf', logconf)
conf.setConfFromOpts(opts)
conf.setConfFromEnvs()
conf.setConfFromFile(path)
conf.setConfFromFile(mods_path, force=True)
except:
logger.exception(f'Error while bootstrapping cell config.')
raise
s_coro.set_pool_logging(logger, logconf=conf['_log_conf'])
try:
cell = await cls.anit(opts.dirn, conf=conf)
except:
logger.exception(f'Error starting cell at {opts.dirn}')
raise
try:
turl = cell._getDmonListen()
if turl is None:
turl = opts.telepath
await cell.dmon.listen(turl)
logger.info(f'...{cell.getCellType()} API (telepath): {turl}')
if 'https:port' not in cell.conf:
await cell.addHttpsPort(opts.https)
logger.info(f'...{cell.getCellType()} API (https): {opts.https}')
else:
port = cell.conf.get('https:port')
if port is None:
logger.info(f'...{cell.getCellType()} API (https): disabled')
else:
logger.info(f'...{cell.getCellType()} API (https): {port}')
if opts.name is not None:
cell.dmon.share(opts.name, cell)
logger.info(f'...{cell.getCellType()} API (telepath name): {opts.name}')
except (asyncio.CancelledError, Exception):
await cell.fini()
raise
return cell
[docs]
@classmethod
async def execmain(cls, argv, outp=None):
'''
The main entry point for running the Cell as an application.
Args:
argv (list): A list of command line arguments to launch the Cell with.
outp (s_ouput.OutPut): Optional, an output object. No longer used in the default implementation.
Notes:
This coroutine waits until the Cell is fini'd or a SIGINT/SIGTERM signal is sent to the process.
Returns:
None.
'''
if outp is None:
outp = s_output.stdout
cell = await cls.initFromArgv(argv, outp=outp)
await cell.main()
async def _getCellUser(self, link, mesg):
# check for a TLS client cert
username = link.getTlsPeerCn()
if username is not None:
if username.find('@') != -1:
userpart, hostpart = username.split('@', 1)
if hostpart == self.conf.get('aha:network'):
user = await self.auth.getUserByName(userpart)
if user is not None:
if user.isLocked():
raise s_exc.AuthDeny(mesg=f'User ({userpart}) is locked.', user=user.iden, username=userpart)
return user
user = await self.auth.getUserByName(username)
if user is not None:
if user.isLocked():
raise s_exc.AuthDeny(mesg=f'User ({username}) is locked.', user=user.iden, username=username)
return user
raise s_exc.NoSuchUser(mesg=f'TLS client cert User not found: {username}', username=username)
auth = mesg[1].get('auth')
if auth is None:
anonuser = self.conf.get('auth:anon')
if anonuser is None:
raise s_exc.AuthDeny(mesg=f'Unable to find cell user ({anonuser})')
user = await self.auth.getUserByName(anonuser)
if user is None:
raise s_exc.AuthDeny(mesg=f'Anon user ({anonuser}) is not found.', username=anonuser)
if user.isLocked():
raise s_exc.AuthDeny(mesg=f'Anon user ({anonuser}) is locked.', username=anonuser,
user=user.iden)
return user
name, info = auth
user = await self.auth.getUserByName(name)
if user is None:
raise s_exc.NoSuchUser(username=name, mesg=f'No such user: {name}.')
# passwd None always fails...
passwd = info.get('passwd')
if not await user.tryPasswd(passwd):
raise s_exc.AuthDeny(mesg='Invalid password', username=user.name, user=user.iden)
return user
[docs]
async def getHealthCheck(self):
health = s_health.HealthCheck(self.getCellIden())
for func in self._health_funcs:
await func(health)
return health.pack()
[docs]
def addHealthFunc(self, func):
'''Register a callback function to get a HealthCheck object.'''
self._health_funcs.append(func)
async def _cellHealth(self, health):
pass
[docs]
async def getDmonSessions(self):
return await self.dmon.getSessInfo()
# ----- Change distributed Auth methods ----
[docs]
async def listHiveKey(self, path=None):
if path is None:
path = ()
items = self.hive.dir(path)
if items is None:
return None
return [item[0] for item in items]
[docs]
async def getHiveKeys(self, path):
'''
Return a list of (name, value) tuples for nodes under the path.
'''
items = self.hive.dir(path)
if items is None:
return ()
return [(i[0], i[1]) for i in items]
[docs]
async def getHiveKey(self, path):
'''
Get the value of a key in the cell default hive
'''
return await self.hive.get(path)
[docs]
async def setHiveKey(self, path, valu):
'''
Set or change the value of a key in the cell default hive
'''
return await self.hive.set(path, valu, nexs=True)
[docs]
async def popHiveKey(self, path):
'''
Remove and return the value of a key in the cell default hive.
Note: this is for expert emergency use only.
'''
return await self.hive.pop(path, nexs=True)
[docs]
async def saveHiveTree(self, path=()):
return await self.hive.saveHiveTree(path=path)
[docs]
async def loadHiveTree(self, tree, path=(), trim=False):
'''
Note: this is for expert emergency use only.
'''
return await self._push('hive:loadtree', tree, path, trim)
@s_nexus.Pusher.onPush('hive:loadtree')
async def _onLoadHiveTree(self, tree, path, trim):
return await self.hive.loadHiveTree(tree, path=path, trim=trim)
[docs]
async def iterSlabData(self, name, prefix=''):
slabkv = self.slab.getSafeKeyVal(name, prefix=prefix, create=False)
for key, valu in slabkv.items():
yield key, valu
await asyncio.sleep(0)
[docs]
@s_nexus.Pusher.onPushAuto('sync')
async def sync(self):
'''
no-op mutable for testing purposes. If I am follower, when this returns, I have received and applied all
the writes that occurred on the leader before this call.
'''
return
[docs]
async def ps(self, user):
isallowed = await self.isUserAllowed(user.iden, ('task', 'get'))
retn = []
for task in self.boss.ps():
if (task.user.iden == user.iden) or isallowed:
retn.append(task.pack())
return retn
[docs]
async def kill(self, user, iden):
perm = ('task', 'del')
isallowed = await self.isUserAllowed(user.iden, perm)
logger.info(f'User [{user.name}] Requesting task kill: {iden}')
task = self.boss.get(iden)
if task is None:
logger.info(f'Task does not exist: {iden}')
return False
if (task.user.iden == user.iden) or isallowed:
logger.info(f'Killing task: {iden}')
await task.kill()
logger.info(f'Task killed: {iden}')
return True
perm = '.'.join(perm)
raise s_exc.AuthDeny(mesg=f'User ({user.name}) must have permission {perm} or own the task',
task=iden, user=user.iden, username=user.name, perm=perm)
[docs]
async def getCellInfo(self):
'''
Return metadata specific for the Cell.
Notes:
By default, this function returns information about the base Cell
implementation, which reflects the base information in the Synapse
Cell.
It is expected that implementers override the following Class
attributes in order to provide meaningful version information:
``COMMIT`` - A Git Commit
``VERSION`` - A Version tuple.
``VERSTRING`` - A Version string.
Returns:
Dict: A Dictionary of metadata.
'''
mirror = self.conf.get('mirror')
if mirror is not None:
mirror = s_urlhelp.sanitizeUrl(mirror)
ret = {
'synapse': {
'commit': s_version.commit,
'version': s_version.version,
'verstring': s_version.verstring,
},
'cell': {
'run': await self.getCellRunId(),
'type': self.getCellType(),
'iden': self.getCellIden(),
'paused': self.paused,
'active': self.isactive,
'started': self.startms,
'ready': self.nexsroot.ready.is_set(),
'commit': self.COMMIT,
'version': self.VERSION,
'verstring': self.VERSTRING,
'cellvers': dict(self.cellvers.items()),
'nexsindx': await self.getNexsIndx(),
'uplink': self.nexsroot.miruplink.is_set(),
'mirror': mirror,
'aha': {
'name': self.conf.get('aha:name'),
'leader': self.conf.get('aha:leader'),
'network': self.conf.get('aha:network'),
},
'network': {
'https': self.https_listeners,
}
},
'features': {
'tellready': True,
'dynmirror': True,
},
}
return ret
[docs]
async def getSystemInfo(self):
'''
Get info about the system in which the cell is running
Returns:
A dictionary with the following keys. All size values are in bytes:
- volsize - Volume where cell is running total space
- volfree - Volume where cell is running free space
- backupvolsize - Backup directory volume total space
- backupvolfree - Backup directory volume free space
- cellstarttime - Cell start time in epoch milliseconds
- celluptime - Cell uptime in milliseconds
- cellrealdisk - Cell's use of disk, equivalent to du
- cellapprdisk - Cell's apparent use of disk, equivalent to ls -l
- osversion - OS version/architecture
- pyversion - Python version
- totalmem - Total memory in the system
- availmem - Available memory in the system
- cpucount - Number of CPUs on system
- tmpdir - The temporary directory interpreted by the Python runtime.
'''
uptime = int((time.monotonic() - self.starttime) * 1000)
disk = shutil.disk_usage(self.dirn)
if self.backdirn:
backupdisk = shutil.disk_usage(self.backdirn)
backupvolsize, backupvolfree = backupdisk.total, backupdisk.free
else:
backupvolsize, backupvolfree = 0, 0
myusage, myappusage = s_common.getDirSize(self.dirn)
totalmem = s_thisplat.getTotalMemory()
availmem = s_thisplat.getAvailableMemory()
pyversion = platform.python_version()
cpucount = multiprocessing.cpu_count()
sysctls = s_thisplat.getSysctls()
tmpdir = s_thisplat.getTempDir()
retn = {
'volsize': disk.total, # Volume where cell is running total bytes
'volfree': disk.free, # Volume where cell is running free bytes
'backupvolsize': backupvolsize, # Cell's backup directory volume total bytes
'backupvolfree': backupvolfree, # Cell's backup directory volume free bytes
'cellstarttime': self.startms, # cell start time in epoch millis
'celluptime': uptime, # cell uptime in ms
'cellrealdisk': myusage, # Cell's use of disk, equivalent to du
'cellapprdisk': myappusage, # Cell's apparent use of disk, equivalent to ls -l
'osversion': platform.platform(), # OS version/architecture
'pyversion': pyversion, # Python version
'totalmem': totalmem, # Total memory in the system
'availmem': availmem, # Available memory in the system
'cpucount': cpucount, # Number of CPUs on system
'sysctls': sysctls, # Performance related sysctls
'tmpdir': tmpdir, # Temporary File / Folder Directory
}
return retn
[docs]
@contextlib.asynccontextmanager
async def getSpooledSet(self):
async with await s_spooled.Set.anit(dirn=self.dirn, cell=self) as sset:
yield sset
[docs]
@contextlib.asynccontextmanager
async def getSpooledDict(self):
async with await s_spooled.Dict.anit(dirn=self.dirn, cell=self) as sdict:
yield sdict
[docs]
async def addSignalHandlers(self):
await s_base.Base.addSignalHandlers(self)
def sighup():
logger.info('Caught SIGHUP, running reloadable subsystems.')
task = asyncio.create_task(self.reload())
self._syn_signal_tasks.add(task)
task.add_done_callback(self._syn_signal_tasks.discard)
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGHUP, sighup)
[docs]
def addReloadableSystem(self, name: str, func: callable):
'''
Add a reloadable system. This may be dynamically called at at time.
Args:
name (str): Name of the system.
func: The callable for reloading a given system.
Note:
Reload functions take no arguments when they are executed.
Values returned by the reload function must be msgpack friendly.
Returns:
None
'''
self._reloadfuncs[name] = func
[docs]
def getReloadableSystems(self):
return tuple(self._reloadfuncs.keys())
[docs]
async def reload(self, subsystem=None):
ret = {}
if subsystem:
func = self._reloadfuncs.get(subsystem)
if func is None:
raise s_exc.NoSuchName(mesg=f'No reload system named {subsystem}',
name=subsystem)
ret[subsystem] = await self._runReloadFunc(subsystem, func)
else:
# Run all funcs
for (rname, func) in self._reloadfuncs.items():
ret[rname] = await self._runReloadFunc(rname, func)
return ret
async def _runReloadFunc(self, name, func):
try:
logger.debug(f'Running cell reload system {name}')
ret = await s_coro.ornot(func)
await asyncio.sleep(0)
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception(f'Error running reload system {name}')
return s_common.retnexc(e)
logger.debug(f'Completed cell reload system {name}')
return (True, ret)
[docs]
async def addUserApiKey(self, useriden, name, duration=None):
'''
Add an API key for a user.
Notes:
The secret API key is only available once.
Args:
useriden (str): User iden value.
name (str): Name of the API key.
duration (int or None): Duration of time for the API key to be valid ( in milliseconds ).
Returns:
tuple: A tuple of the secret API key value and the API key metadata information.
'''
user = await self.auth.reqUser(useriden)
iden, token, shadow = await s_passwd.generateApiKey()
now = s_common.now()
kdef = {
'iden': iden,
'name': name,
'user': user.iden,
'created': now,
'updated': now,
'shadow': shadow,
}
if duration is not None:
if duration < 1:
raise s_exc.BadArg(mesg='Duration must be equal or greater than 1', name='duration')
kdef['expires'] = now + duration
kdef = s_schemas.reqValidUserApiKeyDef(kdef)
await self._push('user:apikey:add', kdef)
logger.info(f'Created HTTP API key {iden} for {user.name}, {name=}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, iden=iden,
status='MODIFY'))
kdef.pop('shadow')
return token, kdef
@s_nexus.Pusher.onPush('user:apikey:add')
async def _genUserApiKey(self, kdef):
iden = s_common.uhex(kdef.get('iden'))
user = s_common.uhex(kdef.get('user'))
self.slab.put(iden, user, db=self.apikeydb)
lkey = user + b'apikey' + iden
self.slab.put(lkey, s_msgpack.en(kdef), db=self.usermetadb)
[docs]
async def getUserApiKey(self, iden):
'''
Get a user API key via iden.
Notes:
This contains the raw value. Callers are responsible for removing the ``shadow`` key.
Args:
iden (str): The key iden.
Returns:
dict: The key dictionary; or none.
'''
lkey = s_common.uhex(iden)
user = self.slab.get(lkey, db=self.apikeydb)
if user is None:
return None
buf = self.slab.get(user + b'apikey' + lkey, db=self.usermetadb)
if buf is None: # pragma: no cover
logger.warning(f'Missing API key {iden} from user metadata for {s_common.ehex(user)}')
return None
kdef = s_msgpack.un(buf) # This includes the shadow key
return kdef
[docs]
async def listUserApiKeys(self, useriden):
'''
Get all the API keys for a user.
Args:
useriden (str): The user iden.
Returns:
list: A list of kdef values.
'''
user = await self.auth.reqUser(useriden)
vals = []
prefix = s_common.uhex(user.iden) + b'apikey'
for lkey, valu in self.slab.scanByPref(prefix, db=self.usermetadb):
kdef = s_msgpack.un(valu)
kdef.pop('shadow')
vals.append(kdef)
await asyncio.sleep(0)
return vals
[docs]
async def getApiKeys(self):
'''
Get all API keys in the cell.
Yields:
tuple: kdef values
'''
# yield all users API keys
for keyiden, useriden in self.slab.scanByFull(db=self.apikeydb):
lkey = useriden + b'apikey' + keyiden
valu = self.slab.get(lkey, db=self.usermetadb)
kdef = s_msgpack.un(valu)
kdef.pop('shadow')
yield kdef
await asyncio.sleep(0)
[docs]
async def checkUserApiKey(self, key):
'''
Check if a user API key is valid.
Notes:
If the key is not valid, the dictionary will contain a ``mesg`` key.
If the key is valid, the dictionary will contain the user def in a ``udef`` key,
and the key metadata in a ``kdef`` key.
Args:
key (str): The API key to check.
Returns:
tuple: Tuple of two items, a boolean if the key is valid and a dictionary.
'''
isok, valu = s_passwd.parseApiKey(key)
if isok is False:
return False, {'mesg': 'API key is malformed.'}
iden, secv = valu
kdef = await self.getUserApiKey(iden)
if kdef is None:
return False, {'mesg': f'API key does not exist: {iden}'}
user = kdef.get('user')
udef = await self.getUserDef(user)
if udef is None: # pragma: no cover
return False, {'mesg': f'User does not exist for API key: {iden}', 'user': user}
if udef.get('locked'):
return False, {'mesg': f'User associated with API key is locked: {iden}',
'user': user, 'name': udef.get('name')}
if ((expires := kdef.get('expires')) is not None):
if s_common.now() > expires:
return False, {'mesg': f'API key is expired: {iden}',
'user': user, 'name': udef.get('name')}
shadow = kdef.pop('shadow')
valid = await s_passwd.checkShadowV2(secv, shadow)
if valid is False:
return False, {'mesg': f'API key shadow mismatch: {iden}',
'user': user, 'name': udef.get('name')}
return True, {'kdef': kdef, 'udef': udef}
[docs]
async def modUserApiKey(self, iden, key, valu):
'''
Update a value in the user API key metadata.
Args:
iden (str): Iden of the key to update.
key (str): Name of the valu to update.
valu: The new value.
Returns:
dict: An updated key metadata dictionary.
'''
if key not in ('name',):
raise s_exc.BadArg(mesg=f'Cannot set {key} on user API keys.', name=key)
kdef = await self.getUserApiKey(iden)
if kdef is None:
raise s_exc.NoSuchIden(mesg=f'User API key does not exist, cannot modify it: {iden}', iden=iden)
useriden = kdef.get('user')
user = await self.auth.reqUser(useriden)
vals = {
'updated': s_common.now()
}
if key == 'name':
vals['name'] = valu
kdef.update(vals)
kdef = s_schemas.reqValidUserApiKeyDef(kdef)
await self._push('user:apikey:edit', kdef.get('user'), iden, vals)
logger.info(f'Updated HTTP API key {iden} for {user.name}, set {key}={valu}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, iden=iden,
status='MODIFY'))
kdef.pop('shadow')
return kdef
@s_nexus.Pusher.onPush('user:apikey:edit')
async def _setUserApiKey(self, user, iden, vals):
lkey = s_common.uhex(user) + b'apikey' + s_common.uhex(iden)
buf = self.slab.get(lkey, db=self.usermetadb)
if buf is None: # pragma: no cover
raise s_exc.NoSuchIden(mesg=f'User API key does not exist: {iden}')
kdef = s_msgpack.un(buf)
kdef.update(vals)
self.slab.put(lkey, s_msgpack.en(kdef), db=self.usermetadb)
return kdef
[docs]
async def delUserApiKey(self, iden):
'''
Delete an existing API key.
Args:
iden (str): The iden of the API key to delete.
Returns:
bool: True indicating the key was deleted.
'''
kdef = await self.getUserApiKey(iden)
if kdef is None:
raise s_exc.NoSuchIden(mesg=f'User API key does not exist: {iden}')
useriden = kdef.get('user')
user = await self.auth.reqUser(useriden)
ret = await self._push('user:apikey:del', useriden, iden)
logger.info(f'Deleted HTTP API key {iden} for {user.name}',
extra=await self.getLogExtra(target_user=user.iden, target_username=user.name, iden=iden,
status='MODIFY'))
return ret
@s_nexus.Pusher.onPush('user:apikey:del')
async def _delUserApiKey(self, user, iden):
user = s_common.uhex(user)
iden = s_common.uhex(iden)
self.slab.delete(iden, db=self.apikeydb)
self.slab.delete(user + b'apikey' + iden, db=self.usermetadb)
return True
async def _onUserDelEvnt(self, evnt):
# Call callback for handling user:del events
udef = evnt[1].get('udef')
user = udef.get('iden')
ukey = s_common.uhex(user)
for lkey, buf in self.slab.scanByPref(ukey, db=self.usermetadb):
suffix = lkey[16:]
# Special handling for certain meta which has secondary indexes
if suffix.startswith(b'apikey'):
key_iden = suffix[6:]
self.slab.delete(key_iden, db=self.apikeydb)
self.slab.delete(lkey, db=self.usermetadb)
await asyncio.sleep(0)
def _makeCachedSslCtx(self, opts):
opts = dict(opts)
cadir = self.conf.get('tls:ca:dir')
if cadir is not None:
sslctx = s_common.getSslCtx(cadir, purpose=ssl.Purpose.SERVER_AUTH)
else:
sslctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
if not opts['verify']:
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
# crypto functions require reading certs/keys from disk so make a temp dir
# to save any certs/keys to disk so they can be read.
with self.getTempDir() as tmpdir:
if opts.get('ca_cert'):
ca_cert = opts.get('ca_cert').encode()
with tempfile.NamedTemporaryFile(dir=tmpdir, mode='wb', delete=False) as fh:
fh.write(ca_cert)
try:
sslctx.load_verify_locations(cafile=fh.name)
except Exception as e: # pragma: no cover
raise s_exc.BadArg(mesg=f'Error loading CA cert: {str(e)}') from None
if not opts['client_cert']:
return sslctx
client_cert = opts['client_cert'].encode()
if opts['client_key']:
client_key = opts['client_key'].encode()
else:
client_key = None
client_key_path = None
with tempfile.NamedTemporaryFile(dir=tmpdir, mode='wb', delete=False) as fh:
fh.write(client_cert)
client_cert_path = fh.name
if client_key:
with tempfile.NamedTemporaryFile(dir=tmpdir, mode='wb', delete=False) as fh:
fh.write(client_key)
client_key_path = fh.name
try:
sslctx.load_cert_chain(client_cert_path, keyfile=client_key_path)
except ssl.SSLError as e:
raise s_exc.BadArg(mesg=f'Error loading client cert: {str(e)}') from None
return sslctx
[docs]
def getCachedSslCtx(self, opts=None, verify=None):
if opts is None:
opts = {}
if verify is not None:
opts['verify'] = verify
opts = s_schemas.reqValidSslCtxOpts(opts)
key = tuple(sorted(opts.items()))
return self._sslctx_cache.get(key)
[docs]
async def freeze(self, timeout=30):
if self.paused:
mesg = 'The service is already frozen.'
raise s_exc.BadState(mesg=mesg)
logger.warning(f'Freezing service for volume snapshot.')
logger.warning('...acquiring nexus lock to prevent edits.')
try:
await asyncio.wait_for(self.nexslock.acquire(), timeout=timeout)
except asyncio.TimeoutError:
logger.warning('...nexus lock acquire timed out!')
logger.warning('Aborting freeze and resuming normal operation.')
mesg = 'Nexus lock acquire timed out.'
raise s_exc.TimeOut(mesg=mesg)
self.paused = True
try:
logger.warning('...committing pending transactions.')
await self.slab.syncLoopOnce()
logger.warning('...flushing dirty buffers to disk.')
await s_task.executor(os.sync)
logger.warning('...done!')
except Exception:
self.paused = False
self.nexslock.release()
logger.exception('Failed to freeze. Resuming normal operation.')
raise
[docs]
async def resume(self):
if not self.paused:
mesg = 'The service is not frozen.'
raise s_exc.BadState(mesg=mesg)
logger.warning('Resuming normal operations from a freeze.')
self.paused = False
self.nexslock.release()