An RMI framework for synapse.
import os
import ssl
import copy
import time
import yaml
import asyncio
import logging
import contextlib
import collections
import synapse.exc as s_exc
import synapse.glob as s_glob
import synapse.common as s_common
import synapse.lib.base as s_base
import synapse.lib.coro as s_coro
import synapse.lib.link as s_link
import synapse.lib.queue as s_queue
import synapse.lib.certdir as s_certdir
import synapse.lib.threads as s_threads
import synapse.lib.urlhelp as s_urlhelp
import synapse.lib.version as s_version
import synapse.lib.hashitem as s_hashitem
logger = logging.getLogger(__name__)
televers = (3, 0)
aha_clients = {}
async def addAhaUrl(url):
Add (incref) an aha registry URL.
NOTE: You may also add a list of redundant URLs.
hkey = s_hashitem.normitem(url)
info = aha_clients.get(hkey)
if info is None:
info = aha_clients[hkey] = {'refs': 0, 'client': None, 'url': url}
info['refs'] += 1
return info
async def delAhaUrl(url):
Remove (decref) an aha registry URL.
NOTE: You may also remove a list of redundant URLs.
hkey = s_hashitem.normitem(url)
info = aha_clients.get(hkey)
if info is None:
return 0
info['refs'] -= 1
refs = info['refs']
if refs == 0:
client = info.get('client')
if client is not None:
await client.fini()
aha_clients.pop(hkey, None)
return refs
def zipurl(info):
Reconstruct a URL string from a parsed telepath info dict.
# copy to prevent mutation
info = dict(info)
host = info.pop('host', None)
port = info.pop('port', None)
path = info.pop('path', None)
user = info.pop('user', None)
passwd = info.pop('passwd', None)
scheme = info.pop('scheme', None)
url = f'{scheme}://'
if user:
url += user
if passwd:
url += f':{passwd}'
url += '@'
if host:
url += host
if port is not None:
url += f':{port}'
if path:
url += f'{path}'
if info:
params = '&'.join([f'{k}={v}' for (k, v) in info.items()])
url += f'?{params}'
return url
def modurl(url, **info):
if isinstance(url, str):
urlinfo = chopurl(url)
for k, v in info.items():
urlinfo[k] = v
return zipurl(urlinfo)
return [modurl(u, **info) for u in url]
def mergeAhaInfo(info0, info1):
# info0 - local urlinfo
# info1 - urlinfo provided by aha
# copy both to prevent mutation
info0 = copy.deepcopy(info0)
info1 = copy.deepcopy(info1)
# local path wins
info1.pop('path', None)
# local user wins if specified
if info0.get('user') is not None:
info1.pop('user', None)
# upstream wins everything else
return info0
async def open(url, onlink=None):
Open a new telepath ClientV2 object based on the given URL.
url (str): The URL to connect to.
onlink: An optional async callback function to run when connections are made.
The onlink callback function has the call signature ``(proxy, urlinfo)``.
The proxy is the Telepath Proxy object.
The urlinfo is the parsed URL information used to create the proxy object.
The urlinfo structure may change between versions of Synapse.
ClientV2: A ClientV2 object.
return await ClientV2.anit(url, onlink=onlink)
async def _getAhaSvc(urlinfo, timeout=None):
host = urlinfo.get('host')
if host is None:
mesg = f'AHA urlinfo has no host: {urlinfo}'
raise s_exc.NoSuchName(mesg=mesg)
if not aha_clients:
mesg = f'No aha servers registered to lookup {host}'
raise s_exc.NotReady(mesg=mesg)
errs = []
for ahaurl, cnfo in list(aha_clients.items()):
client = cnfo.get('client')
if client is None:
client = await Client.anit(ahaurl)
client._fini_atexit = True
cnfo['client'] = client
proxy = await client.proxy(timeout=timeout)
cellinfo = await s_common.wait_for(proxy.getCellInfo(), timeout=5)
kwargs = {}
synvers = cellinfo['synapse']['version']
if synvers >= (2, 95, 0):
kwargs['filters'] = {
'mirror': bool(s_common.yamlloads(urlinfo.get('mirror', 'false'))),
ahasvc = await s_common.wait_for(proxy.getAhaSvc(host, **kwargs), timeout=5)
if ahasvc is None:
svcinfo = ahasvc.get('svcinfo', {})
if not svcinfo.get('online'):
return client, ahasvc
except Exception as e:
if isinstance(ahaurl, str):
surl = s_urlhelp.sanitizeUrl(ahaurl)
surl = tuple([s_urlhelp.sanitizeUrl(u) for u in ahaurl])
logger.exception(f'Unable to get aha client ({surl})')
if errs:
raise errs[-1]
mesg = f'aha lookup failed: {host}'
raise s_exc.NoSuchName(mesg=mesg)
async def getAhaProxy(urlinfo):
Return a telepath proxy by looking up a host from an AHA registry.
ahaclient, ahasvc = await _getAhaSvc(urlinfo, timeout=5)
svcinfo = ahasvc.get('svcinfo', {})
svcurlinfo = mergeAhaInfo(urlinfo, svcinfo.get('urlinfo', {}))
return await openinfo(svcurlinfo)
async def withTeleEnv():
async with loadTeleCell('/vertex/storage'):
yamlpath = s_common.getSynPath('telepath.yaml')
telefini = await loadTeleEnv(yamlpath)
certpath = s_common.getSynPath('certs')
if telefini is not None:
await telefini()
async def loadTeleEnv(path):
path = s_common.genpath(path)
if not os.path.isfile(path):
conf = s_common.yamlload(path)
vers = conf.get('version')
if vers != 1:
logger.warning(f'telepath.yaml unknown version: {vers}')
ahas = conf.get('aha:servers', ())
cdirs = conf.get('certdirs', ())
for a in ahas:
await addAhaUrl(a)
for p in cdirs:
async def fini():
for a in ahas:
await delAhaUrl(a)
for p in cdirs:
return fini
async def loadTeleCell(dirn):
certpath = s_common.genpath(dirn, 'certs')
confpath = s_common.genpath(dirn, 'cell.yaml')
usecerts = os.path.isdir(certpath)
ahaurl = None
if os.path.isfile(confpath):
conf = s_common.yamlload(confpath)
if conf is not None:
ahaurl = conf.get('aha:registry')
if usecerts:
if ahaurl:
await addAhaUrl(ahaurl)
if usecerts:
if ahaurl:
await delAhaUrl(ahaurl)
class Aware:
The telepath.Aware mixin allows shared objects to
handle individual links managed by the Daemon.
async def getTeleApi(self, link, mesg, path):
Return a shared object for this link.
link (synapse.lib.link.Link): A network link.
mesg ((str,dict)): The tele:syn handshake message.
return self
async def getTeleFeats(self):
return {}
def onTeleShare(self, dmon, name):
class Task:
A telepath Task is used to internally track calls/responses.
def __init__(self):
self.retn = None
self.iden = s_common.guid()
self.done = asyncio.Event()
async def result(self):
await self.done.wait()
return self.retn
def reply(self, retn):
self.retn = retn
class Share(s_base.Base):
The telepath client side of a dynamically shared object.
async def __anit__(self, proxy, iden, sharinfo=None):
await s_base.Base.__anit__(self)
self.iden = iden
self.proxy = proxy
if sharinfo is None:
sharinfo = {}
self.sharinfo = sharinfo
self.methinfo = sharinfo.get('meths', {})
self.proxy.shares[iden] = self
self.txfini = True
async def _txShareFini(self):
self.proxy.shares.pop(self.iden, None)
if not self.txfini:
mesg = ('share:fini', {'share': self.iden})
if not self.proxy.link.isfini:
await self.proxy.link.tx(mesg)
def __getattr__(self, name):
info = self.methinfo.get(name)
if info is not None and info.get('genr'):
meth = GenrMethod(self.proxy, name, share=self.iden)
setattr(self, name, meth)
return meth
meth = Method(self.proxy, name, share=self.iden)
setattr(self, name, meth)
return meth
def __enter__(self):
Convenience function to enable using Proxy objects as synchronous context managers.
This should never be used by synapse core code. This is for sync client code convenience only.
if s_threads.iden() == self.tid:
raise s_exc.SynErr('Use of synchronous context manager in async code')
self._ctxobj = self.schedCoroSafePend(self.__aenter__())
return self
def __exit__(self, *args):
This should never be used by synapse core code. This is for sync client code convenience only.
return self.schedCoroSafePend(self._ctxobj.__aexit__(*args))
class Genr(Share):
async def __anit__(self, proxy, iden):
await Share.__anit__(self, proxy, iden, sharinfo={})
self.queue = await s_queue.AQueue.anit()
async def _onShareData(self, data):
async def __aiter__(self):
while not self.isfini:
for retn in await self.queue.slice():
if retn is None:
yield s_common.result(retn)
raise s_exc.LinkShutDown(mesg='Remote peer disconnected')
await self.fini()
def __iter__(self):
while not self.isfini:
for retn in s_glob.sync(self.queue.slice()):
if retn is None:
yield s_common.result(retn)
sharetypes = {
'share': Share,
'genr': Genr,
class Method:
The telepath Method is used to provide proxy method calls.
def __init__(self, proxy, name, share=None):
self.name = name
self.share = share
self.proxy = proxy
# act as much like a bound method as possible...
self.__name__ = name
self.__self__ = proxy
async def __call__(self, *args, **kwargs):
todo = (self.name, args, kwargs)
return await self.proxy.task(todo, name=self.share)
class GenrIter:
An object to help delay a telepath call until iteration.
def __init__(self, proxy, todo, share):
self.todo = todo
self.proxy = proxy
self.share = share
async def list(self):
return [x async for x in self]
async def __aiter__(self):
genr = await self.proxy.task(self.todo, name=self.share)
if genr is None:
async for item in genr:
yield item
await asyncio.sleep(0)
def __iter__(self):
genr = s_glob.sync(self.proxy.task(self.todo, name=self.share))
for item in genr:
yield item
class GenrMethod(Method):
def __call__(self, *args, **kwargs):
todo = (self.name, args, kwargs)
return GenrIter(self.proxy, todo, self.share)
class Pipeline(s_base.Base):
async def __anit__(self, proxy, genr, name=None):
s_common.deprecated('Telepath.Pipeline', curv='2.167.0')
await s_base.Base.__anit__(self)
self.genr = genr
self.name = name
self.proxy = proxy
self.count = 0
self.link = await proxy.getPoolLink()
self.task = self.schedCoro(self._runGenrLoop())
self.taskexc = None
async def _runGenrLoop(self):
async for todo in self.genr:
mesg = ('t2:init', {
'todo': todo,
'name': self.name,
'sess': self.proxy.sess})
await self.link.tx(mesg)
self.count += 1
except asyncio.CancelledError:
except Exception as e:
self.taskexc = e
await self.link.fini()
async def __aiter__(self):
taskdone = False
while not self.isfini:
if not taskdone and self.task.done():
taskdone = True
if taskdone and self.count == 0:
if not self.link.isfini:
await self.proxy._putPoolLink(self.link)
await self.fini()
mesg = await self.link.rx()
if self.taskexc:
raise self.taskexc
if mesg is None:
raise s_exc.LinkShutDown(mesg='Remote peer disconnected')
if mesg[0] == 't2:fini':
self.count -= 1
yield mesg[1].get('retn')
logger.warning(f'Pipeline got unhandled message: {mesg!r}.') # pragma: no cover
class Proxy(s_base.Base):
A telepath Proxy is used to call remote APIs on a shared object.
import synapse.telepath as s_telepath
# open the "foo" object shared in a dmon on localhost:3344
async def doFooThing():
proxy = await s_telepath.openurl('tcp://')
valu = await proxy.getFooValu(x, y)
The proxy (and openurl function) may also be used from sync code:
proxy = s_telepath.openurl('tcp://')
valu = proxy.getFooValu(x, y)
async def __anit__(self, link, name):
await s_base.Base.__anit__(self)
self.tid = s_threads.iden()
self.link = link
self.name = name
self.tasks = {}
self.shares = {}
self._ahainfo = {}
self._features = {}
self.sharinfo = {}
self.methinfo = {}
self.sess = None
self.links = collections.deque()
self._link_poolsize = 4
self.synack = None
self.syndone = asyncio.Event()
self.handlers = {
'task:fini': self._onTaskFini,
'share:data': self._onShareData,
'share:fini': self._onShareFini,
async def fini():
for item in list(self.shares.values()):
await item.fini()
mesg = ('task:fini', {'retn': (False, ('IsFini', {}))})
for name, task in list(self.tasks.items()):
del self.tasks[name]
for link in self.links:
await link.fini()
del self.syndone
await self.link.fini()
def _hasTeleFeat(self, name, vers=1):
return self._features.get(name, 0) >= vers
def _hasTeleMeth(self, name):
return self.methinfo.get(name) is not None
def _getSynVers(self):
Helper method to retrieve the remote Synapse version from Proxy.
This will return None if the synapse version was not supplied
during the Telepath handshake.
tuple: A tuple of major, minor, patch information as integers.
version = self.sharinfo.get('syn:version')
return version
def _getSynCommit(self):
Helper method to retrieve the remote Synapse commit hash from Proxy.
This will return None if the synapse commit hash was not supplied
during the Telepath handshake.
str: A string containing the commit hash. This may be a empty string.
return self.sharinfo.get('syn:commit')
def _getClasses(self):
Helper method to retrieve the classes that comprise the remote object.
This will return None if the class version was not supplied
during the Telepath handshake.
tuple: A tuple of strings containing the class paths for the remote object.
classes = self.sharinfo.get('classes')
return classes
async def getPoolLink(self):
while self.links and not self.isfini:
link = self.links.popleft()
if link.isfini:
return link
# we need a new one...
return await self._initPoolLink()
async def getPipeline(self, genr, name=None):
Construct a proxy API call pipeline in order to make
multiple telepath API calls while minimizing round trips.
genr (async generator): An async generator that yields todo tuples.
name (str): The name of the shared object on the daemon.
def genr():
yield s_common.todo('getFooByBar', 10)
yield s_common.todo('getFooByBar', 20)
for retn in proxy.getPipeline(genr()):
valu = s_common.result(retn)
async with await Pipeline.anit(self, genr, name=name) as pipe:
async for retn in pipe:
yield retn
async def _initPoolLink(self):
# TODO loop / backoff
if self.link.get('unix'):
path = self.link.get('path')
link = await s_link.unixconnect(path)
ssl = self.link.get('ssl')
host = self.link.get('host')
port = self.link.get('port')
link = await s_link.connect(host, port, ssl=ssl)
return link
async def _putPoolLink(self, link):
if link.isfini:
# If we've exceeded our poolsize, discard the current link.
if len(self.links) >= self._link_poolsize:
return await link.fini()
def __enter__(self):
Convenience function to enable using Proxy objects as synchronous context managers.
This must not be used from async code, and it should never be used in core synapse code.
if s_threads.iden() == self.tid:
raise s_exc.SynErr('Use of synchronous context manager in async code')
self._ctxobj = self.schedCoroSafePend(self.__aenter__())
return self
def __exit__(self, *args):
This should never be used by core synapse code.
return self.schedCoroSafePend(self._ctxobj.__aexit__(*args))
async def _onShareFini(self, mesg):
iden = mesg[1].get('share')
share = self.shares.get(iden)
if share is None:
share.txfini = False
await share.fini()
async def _onShareData(self, mesg):
data = mesg[1].get('data')
iden = mesg[1].get('share')
share = self.shares.get(iden)
if share is None:
await share._onShareData(data)
async def call(self, methname, *args, **kwargs):
Call a remote method by name.
methname (str): The name of the remote method.
*args: Arguments to the method call.
**kwargs: Keyword arguments to the method call.
Most use cases will likely use the proxy methods directly:
The following two are effectively the same:
valu = proxy.getFooBar(x, y)
valu = proxy.call('getFooBar', x, y)
todo = (methname, args, kwargs)
return await self.task(todo)
async def taskv2(self, todo, name=None):
mesg = ('t2:init', {
'todo': todo,
'name': name,
'sess': self.sess})
link = await self.getPoolLink()
await link.tx(mesg)
mesg = await link.rx()
if mesg is None:
raise s_exc.LinkShutDown(mesg='Remote peer disconnected')
if mesg[0] == 't2:fini':
await self._putPoolLink(link)
retn = mesg[1].get('retn')
return s_common.result(retn)
if mesg[0] == 't2:genr':
async def genrloop():
while True:
mesg = await link.rx()
if mesg is None:
raise s_exc.LinkShutDown(mesg='Remote peer disconnected')
if mesg[0] != 't2:yield': # pragma: no cover
info = 'Telepath protocol violation: unexpected message received'
raise s_exc.BadMesgFormat(mesg=info)
retn = mesg[1].get('retn')
if retn is None:
await self._putPoolLink(link)
# if this is an exception, it's the end...
if not retn[0]:
await self._putPoolLink(link)
yield s_common.result(retn)
except GeneratorExit:
# if they bail early on the genr, fini the link
await link.fini()
return s_coro.GenrHelp(genrloop())
if mesg[0] == 't2:share':
iden = mesg[1].get('iden')
sharinfo = mesg[1].get('sharinfo')
await self._putPoolLink(link)
return await Share.anit(self, iden, sharinfo)
async def task(self, todo, name=None):
if self.isfini:
raise s_exc.IsFini(mesg='Telepath Proxy isfini')
if self.sess is not None:
return await self.taskv2(todo, name=name)
s_common.deprecated('Telepath task with no session', curv='2.166.0')
task = Task()
mesg = ('task:init', {
'task': task.iden,
'todo': todo,
'name': name, })
self.tasks[task.iden] = task
await self.link.tx(mesg)
retn = await task.result()
return s_common.result(retn)
self.tasks.pop(task.iden, None)
async def handshake(self, auth=None):
mesg = ('tele:syn', {
'auth': auth,
'vers': televers,
'name': self.name,
await self.link.tx(mesg)
self.synack = await self.link.rx()
if self.synack is None:
mesg = 'socket closed by server before handshake'
raise s_exc.LinkShutDown(mesg=mesg)
self.sess = self.synack[1].get('sess')
self._ahainfo = self.synack[1].get('ahainfo', {})
self._features = self.synack[1].get('features', {})
self.sharinfo = self.synack[1].get('sharinfo', {})
self.methinfo = self.sharinfo.get('meths', {})
vers = self.synack[1].get('vers')
if vers[0] != televers[0]:
raise s_exc.BadMesgVers(myver=televers, hisver=vers)
async def rxloop():
while not self.link.isfini:
mesg = await self.link.rx()
if mesg is None:
func = self.handlers.get(mesg[0])
if func is None:
logger.warning('Proxy.rxloop: Invalid Message: %r' % (mesg,))
await func(mesg)
except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only
except Exception:
logger.exception('Proxy.rxloop for %r' % (mesg,))
retn = self.synack[1].get('retn')
valu = s_common.result(retn)
return valu
async def _txShareExc(self, iden):
# send a share:fini for an unhandled share.
await self.link.tx(
('share:fini', {'share': iden, 'isexc': True})
async def _onTaskFini(self, mesg):
# handle task:fini message
iden = mesg[1].get('task')
task = self.tasks.pop(iden, None)
if task is None:
logger.warning('task:fini for invalid task: %r' % (iden,))
retn = mesg[1].get('retn')
type = mesg[1].get('type')
if type is None:
return task.reply(retn)
ctor = sharetypes.get(type, Share)
item = await ctor.anit(self, retn[1])
return task.reply((True, item))
def __getattr__(self, name):
info = self.methinfo.get(name)
if info is not None and info.get('genr'):
meth = GenrMethod(self, name)
setattr(self, name, meth)
return meth
meth = Method(self, name)
setattr(self, name, meth)
return meth
class ClientV2(s_base.Base):
A telepath client which:
* connects to multiple services
* distributes API calls across them
* receives topology updates from AHA
NOTE: This must co-exist with Client until we eliminate uses that
attempt to call telepath APIs directly from the Client rather
than awaiting a proxy()
async def __anit__(self, urlinfo, onlink=None):
await s_base.Base.__anit__(self)
self.aha = None
self.clients = {}
self.proxies = set()
self.poolname = None
self.onlink = onlink
self.bootdeque = collections.deque()
self.ready = asyncio.Event()
self.deque = collections.deque()
self.mesghands = {
'svc:add': self._onPoolSvcAdd,
'svc:del': self._onPoolSvcDel,
async def fini():
await self._shutDownPool()
# wake the sleepers
def setBootUrls(self, urlinfo):
if not isinstance(urlinfo, (list, tuple)):
urlinfo = (urlinfo,)
self.booturls = [chopurl(u) for u in urlinfo]
def getNextBootUrl(self):
if not self.bootdeque:
return self.bootdeque.popleft()
async def _initBootProxy(self):
lastlog = 0.0
while not self.isfini:
urlinfo = self.getNextBootUrl()
if urlinfo.get('scheme') == 'aha':
self.aha, svcinfo = await _getAhaSvc(urlinfo)
# if the service is a pool, enter pool mode and fire
# the topography sync task to manage pool members.
services = svcinfo.get('services')
if services is not None:
# we are an AHA pool!
if self.poolname is None:
self.poolname = svcinfo.get('name')
# regular telepath client behavior
proxy = await openinfo(urlinfo)
async def reconnect():
if not self.isfini:
await self._onPoolLink(proxy, urlinfo)
except Exception as e:
now = time.monotonic()
if now > lastlog + 60.0: # don't logspam the disconnect message more than 1/min
url = s_urlhelp.sanitizeUrl(zipurl(urlinfo))
logger.exception(f'telepath clientv2 ({url}) encountered an error: {e}')
lastlog = now
retrysleep = float(urlinfo.get('retrysleep', 0.2))
await self.waitfini(timeout=retrysleep)
async def waitready(self, timeout=None):
await s_common.wait_for(self.ready.wait(), timeout=timeout)
def size(self):
return len(self.proxies)
async def _onPoolSvcAdd(self, mesg):
svcname = mesg[1].get('name')
if (oldc := self.clients.pop(svcname, None)) is not None:
await oldc.fini()
urlinfo = {'scheme': 'aha', 'host': svcname, 'path': ''}
self.clients[svcname] = await ClientV2.anit(urlinfo, onlink=self._onPoolLink)
await self.fire('svc:add', **mesg[1])
async def _onPoolSvcDel(self, mesg):
svcname = mesg[1].get('name')
client = self.clients.pop(svcname, None)
if client is not None:
await client.fini()
await self.fire('svc:del', **mesg[1])
async def _onPoolLink(self, proxy, urlinfo):
async def onfini():
if proxy in self.proxies:
if proxy in self.deque:
if not len(self.proxies):
if self.onlink is not None:
await self.onlink(proxy, urlinfo)
except Exception as e:
logger.exception(f'onlink: {self.onlink}')
async def _shutDownPool(self):
# when we reconnect to our AHA service, we need to dump the current
# topology state and gather it again.
for client in list(self.clients.values()):
await client.fini()
for proxy in list(self.proxies):
await proxy.fini()
async def _toposync(self):
async def reset():
await self._shutDownPool()
await self.fire('pool:reset')
while not self.isfini:
ahaproxy = await self.aha.proxy()
await reset()
async for mesg in ahaproxy.iterPoolTopo(self.poolname):
hand = self.mesghands.get(mesg[0])
if hand is None: # pragma: no cover
logger.warning(f'Unknown AHA pool topography message: {mesg}')
await hand(mesg)
except Exception as e:
logger.warning(f'AHA pool topology task restarting: {e}')
await self.waitfini(timeout=1)
async def proxy(self, timeout=None):
async def getNextProxy():
while not self.isfini:
await self.ready.wait()
if self.isfini: # pragma: no cover
raise s_exc.IsFini()
if not self.deque:
if not self.deque:
return self.deque.popleft()
# use an inner function so we can wait overall...
return await s_common.wait_for(getNextProxy(), timeout)
class Client(s_base.Base):
A Telepath client object which reconnects and allows waiting for link up.
The conf data allows changing parameters such as timeouts, retry period, and link pool size. The default
conf data can be seen below::
conf = {
'timeout': 10,
'retrysleep': 0.2,
'link_poolsize': 4,
async def __anit__(self, urlinfo, opts=None, conf=None, onlink=None):
await s_base.Base.__anit__(self)
if conf is None:
conf = {}
if opts is None:
opts = {}
self._t_urldeque = collections.deque()
self._t_opts = opts
self._t_conf = conf
self._t_proxy = None
self._t_ready = asyncio.Event()
self._t_onlinks = []
self._t_methinfo = None
self._t_named_meths = set()
if onlink is not None:
async def fini():
if self._t_proxy is not None:
await self._t_proxy.fini()
# Wake any waiters which may be waiting on waitready() calls so those
# without timeouts specified are not waiting forever.
await self._fireLinkLoop()
def setBootUrls(self, urlinfo):
if not isinstance(urlinfo, (list, tuple)):
urlinfo = (urlinfo,)
self._t_urlinfo = [chopurl(u) for u in urlinfo]
def _getNextUrl(self):
if not self._t_urldeque:
return self._t_urldeque.popleft()
async def onlink(self, func):
if self._t_proxy:
await func(self._t_proxy)
async def offlink(self, func):
async def _fireLinkLoop(self):
self._t_proxy = None
await self.fire('tele:client:linkloop')
async def _teleLinkLoop(self):
lastlog = 0.0
while not self.isfini:
urlinfo = self._getNextUrl()
await self._initTeleLink(urlinfo)
except Exception as e:
now = time.monotonic()
if now > lastlog + 60.0: # don't logspam the disconnect message more than 1/min
url = s_urlhelp.sanitizeUrl(zipurl(urlinfo))
logger.warning(f'telepath client ({url}) encountered an error: {e}', exc_info=e)
lastlog = now
await self.waitfini(timeout=self._t_conf.get('retrysleep', 0.2))
async def proxy(self, timeout=10):
await self.waitready(timeout=timeout)
ret = self._t_proxy
if ret is None or ret.isfini is True: # pragma: no cover
raise s_exc.IsFini(mesg='Telepath Client Proxy is not available.')
return ret
async def _initTeleLink(self, urlinfo):
info = urlinfo.copy()
self._t_proxy = await openinfo(info)
self._t_methinfo = self._t_proxy.methinfo
self._t_proxy._link_poolsize = self._t_conf.get('link_poolsize', 4)
async def fini():
if self._t_named_meths:
for name in self._t_named_meths:
delattr(self, name)
if not self.isfini:
await self._fireLinkLoop()
for onlink in self._t_onlinks:
await onlink(self._t_proxy)
# in case the callback fini()s the proxy
if self._t_proxy is None:
except Exception as e:
logger.exception(f'onlink: {onlink}')
async def task(self, todo, name=None):
# implement the main workhorse method for a proxy to allow Method
# objects to use us as the proxy.
proxy = await self.proxy()
return await proxy.task(todo, name=name)
async def waitready(self, timeout=10):
await s_common.wait_for(self._t_ready.wait(), self._t_conf.get('timeout', timeout))
def __getattr__(self, name):
if self._t_methinfo is None:
raise s_exc.NotReady(mesg='Must call waitready() on Client before first method call')
info = self._t_methinfo.get(name)
if info is not None and info.get('genr'):
meth = GenrMethod(self, name)
setattr(self, name, meth)
return meth
meth = Method(self, name)
setattr(self, name, meth)
return meth
def _getSynVers(self):
Helper method to retrieve the remote Synapse version from Client
for the currently connected Proxy.
This will return None if the synapse version was not supplied
during the Telepath handshake.
tuple: A tuple of major, minor, patch information as integers.
return self._t_proxy._getSynVers()
def _getSynCommit(self):
Helper method to retrieve the remote Synapse commit hash from Proxy.
This will return None if the synapse commit hash was not supplied
during the Telepath handshake.
str: A string containing the commit hash. This may be a empty string.
return self._t_proxy._getSynCommit()
def _getClasses(self):
Helper method to retrieve the classes that comprise the remote object
for the currently connected Proxy.
This will return None if the class version was not supplied
during the Telepath handshake.
tuple: A tuple of strings containing the class paths for the remote object.
return self._t_proxy._getClasses()
def alias(name):
Resolve a telepath alias via ~/.syn/aliases.yaml
name (str): Name of the alias to resolve.
An exact match against the aliases will always be returned first.
If no exact match is found and the name contains a '/' in it, the
value before the slash is looked up and the remainder of the path
is joined to any result. This is done to support dynamic Telepath
share names.
str: The url string, if present in the alias. None will be returned
if there are no matches.
path = s_common.getSynPath('aliases.yaml')
if not os.path.isfile(path):
return None
conf = s_common.yamlload(path)
# Is there an exact match - if so, return it.
url = conf.get(name)
if url:
return url
# Since telepath supports dynamic shared object access,
# slice a name at the first '/', look up using that value
# and then append the second value to it.
dynname = None
if '/' in name:
name, dynname = name.split('/', 1)
url = conf.get(name)
if url and dynname:
url = '/'.join([url, dynname])
return url
async def openurl(url, **opts):
Open a URL to a remote telepath object.
url (str): A telepath URL.
**opts (dict): Telepath connect options.
(synapse.telepath.Proxy): A telepath proxy object.
The telepath proxy may then be used for sync or async calls:
proxy = openurl(url)
value = proxy.getFooThing()
... or ...
proxy = await openurl(url)
valu = await proxy.getFooThing()
... or ...
async with await openurl(url) as proxy:
valu = await proxy.getFooThing()
info = chopurl(url, **opts)
return await openinfo(info)
def chopurl(url, **opts):
if isinstance(url, str):
if url.find('://') == -1:
newurl = alias(url)
if newurl is None:
raise s_exc.BadUrl(mesg=f':// not found in [{url}] and no alias found!',
url = newurl
info = s_urlhelp.chopurl(url)
# flatten query params into info
query = info.pop('query', None)
if query is not None:
elif isinstance(url, dict):
info = dict(url)
mesg = 'telepath.chopurl() requires a str or dict.'
raise s_exc.BadArg(mesg)
return info
class TeleSSLObject(ssl.SSLObject):
def do_handshake(self):
# steal a reference for later so we can get the cert
self.context.telessl = self
return ssl.SSLObject.do_handshake(self)
async def openinfo(info):
scheme = info.get('scheme')
if scheme == 'aha':
return await getAhaProxy(info)
if '+' in scheme:
scheme, disc = scheme.split('+', 1)
if disc == 'consul': # pragma: no cover
raise s_exc.FeatureNotSupported(mesg='Consul is no longer supported.')
raise s_exc.BadUrl(mesg=f'Unknown discovery protocol [{disc}].',
host = info.get('host')
port = info.get('port')
auth = None
user = info.get('user')
if user is not None:
passwd = info.get('passwd')
auth = (user, {'passwd': passwd})
if scheme == 'cell':
# cell:///path/to/celldir:share
# cell://rel/path/to/celldir:share
path = info.get('path')
name = info.get('name', '*')
# support cell://<relpath>/<to>/<cell>
# by detecting host...
host = info.get('host')
if host:
path = path.strip('/')
path = os.path.join(host, path)
if ':' in path:
path, name = path.split(':')
full = os.path.join(path, 'sock')
link = await s_link.unixconnect(full)
elif scheme == 'unix':
# unix:///path/to/sock:share
name = '*'
path = info.get('path')
if ':' in path:
path, name = path.split(':')
link = await s_link.unixconnect(path)
elif scheme in ('tcp', 'ssl'):
path = info.get('path')
name = info.get('name', path[1:])
if port is None:
port = 27492
hostname = None
sslctx = None
linkinfo = {}
if scheme == 'ssl':
certdir = info.get('certdir')
certhash = info.get('certhash')
certname = info.get('certname')
hostname = info.get('hostname', host)
linkinfo['certhash'] = certhash
linkinfo['hostname'] = hostname
if certdir is None:
certdir = s_certdir.getCertDir()
# if a TLS connection specifies a user with no password
# attempt to auto-resolve a user certificate for the given
# host/network.
if certname is None and user is not None and passwd is None:
certname = f'{user}@{hostname}'
if certhash is None:
sslctx = certdir.getClientSSLContext(certname=certname)
sslctx = ssl.create_default_context()
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
sslctx.sslobject_class = TeleSSLObject
# do hostname checking manually to avoid DNS lookups
# ( to support dynamic IP addresses on services )
sslctx.check_hostname = False
linkinfo['ssl'] = sslctx
link = await s_link.connect(host, port, linkinfo=linkinfo)
raise s_exc.BadUrl(mesg=f'Invalid URL scheme: {scheme}')
prox = await Proxy.anit(link, name)
await prox.handshake(auth=auth)
except (asyncio.CancelledError, Exception):
await prox.fini()
return prox