import json
import base64
import asyncio
import logging
from urllib.parse import urlparse
import tornado.web as t_web
import tornado.websocket as t_websocket
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.base as s_base
import synapse.lib.msgpack as s_msgpack
import synapse.lib.hiveauth as s_hiveauth
import synapse.lib.crypto.passwd as s_passwd
logger = logging.getLogger(__name__)
[docs]class Sess(s_base.Base):
async def __anit__(self, cell, iden, info):
await s_base.Base.__anit__(self)
self.user = None
self.socks = set()
self.cell = cell
# for transient session info
self.locl = {}
# for persistent session info
self.iden = iden
self.info = info
user = self.info.get('user')
if user is not None:
self.user = self.cell.auth.user(user)
[docs] async def set(self, name, valu):
await self.cell.setHttpSessInfo(self.iden, name, valu)
self.info[name] = valu
[docs] async def login(self, user):
self.user = user
await self.set('user', user.iden)
await self.fire('sess:login')
[docs] async def logout(self):
self.user = None
await self.set('user', None)
await self.fire('sess:logout')
[docs] def addWebSock(self, sock):
self.socks.add(sock)
[docs] def delWebSock(self, sock):
self.socks.discard(sock)
[docs]class HandlerBase:
[docs] def initialize(self, cell):
self.cell = cell
self._web_sess = None
self._web_user = None
# this can't live in set_default_headers() due to call ordering in tornado
headers = self.getCustomHeaders()
if headers is not None:
for name, valu in headers.items():
self.add_header(name, valu)
[docs] def getCustomHeaders(self):
return self.cell.conf.get('https:headers')
[docs] def set_default_headers(self):
self.clear_header('Server')
self.add_header('X-Content-Type-Options', 'nosniff')
origin = self.request.headers.get('origin')
if origin is not None and self.isOrigHost(origin):
self.add_header('Access-Control-Allow-Origin', origin)
self.add_header('Access-Control-Allow-Credentials', 'true')
self.add_header('Access-Control-Allow-Headers', 'Content-Type')
[docs] def getAuthCell(self):
'''
Return a reference to the cell used for auth operations.
'''
return self.cell
[docs] def options(self):
self.set_status(204)
self.finish()
[docs] def isOrigHost(self, origin):
host = urlparse(origin).hostname
hosttag = self.request.headers.get('host')
if ':' in hosttag:
hosttag, hostport = hosttag.split(':', 1)
return host == hosttag
[docs] def check_origin(self, origin):
return self.isOrigHost(origin)
[docs] def getJsonBody(self, validator=None):
return self.loadJsonMesg(self.request.body, validator=validator)
[docs] def sendRestErr(self, code, mesg):
self.set_header('Content-Type', 'application/json')
return self.write({'status': 'err', 'code': code, 'mesg': mesg})
[docs] def sendRestExc(self, e):
self.set_header('Content-Type', 'application/json')
return self.sendRestErr(e.__class__.__name__, str(e))
[docs] def sendRestRetn(self, valu):
self.set_header('Content-Type', 'application/json')
return self.write({'status': 'ok', 'result': valu})
[docs] def loadJsonMesg(self, byts, validator=None):
try:
item = json.loads(byts)
if validator is not None:
validator(item)
return item
except s_exc.SchemaViolation as e:
self.sendRestErr('SchemaViolation', str(e))
return None
except Exception:
self.sendRestErr('SchemaViolation', 'Invalid JSON content.')
return None
[docs] def sendAuthRequired(self):
self.set_header('WWW-Authenticate', 'Basic realm=synapse')
self.set_status(401)
self.sendRestErr('NotAuthenticated', 'The session is not logged in.')
[docs] async def reqAuthUser(self):
if await self.authenticated():
return True
self.sendAuthRequired()
return False
[docs] async def reqAuthAdmin(self):
user = await self.user()
if user is None:
self.sendAuthRequired()
return False
if not user.isAdmin():
self.sendRestErr('AuthDeny', f'User {user.iden} ({user.name}) is not an admin.')
return False
return True
[docs] async def sess(self, gen=True):
if self._web_sess is None:
iden = self.get_secure_cookie('sess', max_age_days=14)
if iden is None and not gen:
return None
if iden is None:
iden = s_common.guid().encode()
opts = {'expires_days': 14, 'secure': True, 'httponly': True}
self.set_secure_cookie('sess', iden, **opts)
self._web_sess = await self.cell.genHttpSess(iden)
return self._web_sess
[docs] async def user(self):
if self._web_user is not None:
return self._web_user
sess = await self.sess(gen=False)
if sess is not None:
return sess.user
auth = self.request.headers.get('Authorization')
if auth is None:
return None
if not auth.startswith('Basic '):
return None
_, blob = auth.split(None, 1)
try:
text = base64.b64decode(blob).decode('utf8')
name, passwd = text.split(':', 1)
except Exception:
logger.exception('invalid basic auth header')
return None
user = await self.cell.auth.getUserByName(name)
if user is None:
return None
if user.isLocked():
return None
if not await user.tryPasswd(passwd):
return None
self._web_user = user
return user
[docs] async def useriden(self):
'''
Return the user iden of the current session user.
NOTE: APIs should migrate toward using this rather than the heavy
Handler.user() API to facilitate reuse of handler objects with
telepath references.
'''
user = await self.user()
if user is None:
return None
return user.iden
[docs] async def allowed(self, perm, gateiden=None):
'''
Return true if there is a logged in user with the given permissions.
NOTE: This API sets up HTTP response values if it returns False.
NOTE: This API uses the Handler.getAuthCell() abstraction and is safe for use
in split-auth cells.
'''
authcell = self.getAuthCell()
useriden = await self.useriden()
if useriden is None:
self.sendAuthRequired()
return False
if await authcell.isUserAllowed(useriden, perm, gateiden=gateiden):
return True
udef = await authcell.getUserDef(useriden)
username = udef.get('name')
self.set_status(403)
mesg = f'User ({username}) must have permission {".".join(perm)}'
self.sendRestErr('AuthDeny', mesg)
return False
[docs] async def authenticated(self):
return await self.useriden() is not None
[docs] async def getUserBody(self):
'''
Helper function to confirm that there is a auth user and a valid JSON body in the request.
Returns:
(User, object): The user and body of the request as deserialized JSON, or a tuple of s_common.novalu
objects if there was no user or json body.
'''
if not await self.reqAuthUser():
return (s_common.novalu, s_common.novalu)
body = self.getJsonBody()
if body is None:
return (s_common.novalu, s_common.novalu)
user = await self.user()
return (user, body)
[docs]class WebSocket(HandlerBase, t_websocket.WebSocketHandler):
[docs] async def xmit(self, name, **info):
await self.write_message(json.dumps({'type': name, 'data': info}))
async def _reqUserAllow(self, perm):
user = await self.user()
if user is None:
mesg = 'Session is not authenticated.'
raise s_exc.AuthDeny(mesg=mesg, perm=perm)
if not user.allowed(perm):
ptxt = '.'.join(perm)
mesg = f'Permission denied: {ptxt}.'
raise s_exc.AuthDeny(mesg=mesg, perm=perm)
[docs]class Handler(HandlerBase, t_web.RequestHandler):
[docs] def prepare(self):
self.task = asyncio.current_task()
[docs] def on_connection_close(self):
if hasattr(self, 'task'):
self.task.cancel()
async def _reqValidOpts(self, opts):
if opts is None:
opts = {}
authcell = self.getAuthCell()
useriden = await self.useriden()
opts.setdefault('user', useriden)
if opts.get('user') != useriden:
if not await self.allowed(('impersonate',)):
return None
return opts
[docs]class RobotHandler(HandlerBase, t_web.RequestHandler):
[docs] async def get(self):
self.write('User-agent: *\n')
self.write('Disallow: /\n')
[docs]@t_web.stream_request_body
class StreamHandler(Handler):
'''
Subclass for Tornado streaming uploads.
Notes:
- Async method prepare() is called after headers are read but before body processing.
- Sync method on_finish() can be used to cleanup after a request.
- Sync method on_connection_close() can be used to cleanup after a client disconnect.
- Async methods post(), put(), etc are called after the streaming has completed.
'''
[docs] async def data_received(self, chunk):
raise s_exc.NoSuchImpl(mesg='data_received must be implemented by subclasses.',
name='data_received')
[docs]class StormHandler(Handler):
[docs] def getCore(self):
# add an abstraction to allow subclasses to dictate how
# a reference to the cortex is returned from the handler.
return self.cell
[docs]class StormNodesV1(StormHandler):
[docs] async def post(self):
return await self.get()
[docs] async def get(self):
user, body = await self.getUserBody()
if body is s_common.novalu:
return
s_common.deprecated('HTTP API /api/v1/storm/nodes', curv='2.110.0')
# dont allow a user to be specified
opts = body.get('opts')
query = body.get('query')
stream = body.get('stream')
jsonlines = stream == 'jsonlines'
await self.cell.boss.promote('storm', user=user, info={'query': query})
opts = await self._reqValidOpts(opts)
if opts is None:
return
view = self.cell._viewFromOpts(opts)
async for pode in view.iterStormPodes(query, opts=opts):
self.write(json.dumps(pode))
if jsonlines:
self.write("\n")
await self.flush()
[docs]class StormV1(StormHandler):
[docs] async def post(self):
return await self.get()
[docs] async def get(self):
if not await self.reqAuthUser():
return
body = self.getJsonBody()
if body is None:
return
opts = body.get('opts')
query = body.get('query')
stream = body.get('stream')
jsonlines = stream == 'jsonlines'
# Maintain backwards compatibility with 0.1.x output
opts = await self._reqValidOpts(opts)
if opts is None:
return
opts['editformat'] = 'splices'
async for mesg in self.getCore().storm(query, opts=opts):
self.write(json.dumps(mesg))
if jsonlines:
self.write("\n")
await self.flush()
[docs]class StormCallV1(StormHandler):
[docs] async def post(self):
return await self.get()
[docs] async def get(self):
if not await self.reqAuthUser():
return
body = self.getJsonBody()
if body is None:
return
opts = body.get('opts')
query = body.get('query')
opts = await self._reqValidOpts(opts)
if opts is None:
return
try:
ret = await self.getCore().callStorm(query, opts=opts)
except s_exc.SynErr as e:
mesg = e.get('mesg', str(e))
return self.sendRestErr(e.__class__.__name__, mesg)
except asyncio.CancelledError: # pragma: no cover
raise
except Exception as e:
mesg = str(e)
return self.sendRestErr(e.__class__.__name__, mesg)
else:
return self.sendRestRetn(ret)
[docs]class StormExportV1(StormHandler):
[docs] async def post(self):
return await self.get()
[docs] async def get(self):
if not await self.reqAuthUser():
return
body = self.getJsonBody()
if body is None:
return
opts = body.get('opts')
query = body.get('query')
opts = await self._reqValidOpts(opts)
if opts is None:
return
try:
self.set_header('Content-Type', 'application/x-synapse-nodes')
async for pode in self.getCore().exportStorm(query, opts=opts):
self.write(s_msgpack.en(pode))
await self.flush()
except Exception as e:
return self.sendRestExc(e)
[docs]class ReqValidStormV1(StormHandler):
[docs] async def post(self):
return await self.get()
[docs] async def get(self):
_, body = await self.getUserBody()
if body is s_common.novalu:
return
opts = body.get('opts', {})
query = body.get('query')
try:
valid = await self.cell.reqValidStorm(query, opts)
except s_exc.SynErr as e:
mesg = e.get('mesg', str(e))
return self.sendRestErr(e.__class__.__name__, mesg)
else:
return self.sendRestRetn(valid)
[docs]class WatchSockV1(WebSocket):
'''
A web-socket based API endpoint for distributing cortex tag events.
Deprecated.
'''
[docs] async def onWatchMesg(self, byts):
try:
wdef = json.loads(byts)
iden = wdef.get('view', self.cell.view.iden)
perm = ('watch', 'view', iden)
await self._reqUserAllow(perm)
async with self.cell.watcher(wdef) as watcher:
await self.xmit('init')
async for mesg in watcher:
await self.xmit(mesg[0], **mesg[1])
# pragma: no cover
# (this would only happen on slow-consumer)
await self.xmit('fini')
except s_exc.SynErr as e:
text = e.get('mesg', str(e))
await self.xmit('errx', code=e.__class__.__name__, mesg=text)
except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only
raise
except Exception as e:
await self.xmit('errx', code=e.__class__.__name__, mesg=str(e))
[docs] async def on_message(self, byts):
s_common.deprecated('WatchSockV1')
self.cell.schedCoro(self.onWatchMesg(byts))
[docs]class BeholdSockV1(WebSocket):
[docs] async def isUserAdmin(self):
user = await self.user()
if user is None:
return False
if not user.isAdmin():
return False
return True
[docs] async def onInitMessage(self, byts):
try:
mesg = json.loads(byts)
if mesg.get('type') != 'call:init':
raise s_exc.BadMesgFormat('Invalid initial message')
admin = await self.isUserAdmin()
if not admin:
await self.xmit('errx', code='AuthDeny', mesg='Beholder API requires admin privs')
return
async with self.cell.beholder() as beholder:
await self.xmit('init')
async for mesg in beholder:
await self.xmit('iter', **mesg)
await self.xmit('fini')
except s_exc.SynErr as e:
text = e.get('mesg', str(e))
await self.xmit('errx', code=e.__class__.__name__, mesg=text)
except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only
raise
except Exception as e:
await self.xmit('errx', code=e.__class__.__name__, mesg=str(e))
[docs] async def on_message(self, byts):
self.cell.schedCoro(self.onInitMessage(byts))
[docs]class LoginV1(Handler):
[docs] async def post(self):
body = self.getJsonBody()
if body is None:
return
name = body.get('user')
passwd = body.get('passwd')
user = await self.cell.auth.getUserByName(name)
if user is None:
return self.sendRestErr('AuthDeny', 'No such user.')
if not await user.tryPasswd(passwd):
return self.sendRestErr('AuthDeny', 'Incorrect password.')
sess = await self.sess()
await sess.login(user)
return self.sendRestRetn(user.pack())
[docs]class AuthUsersV1(Handler):
[docs] async def get(self):
if not await self.reqAuthUser():
return
try:
archived = int(self.get_argument('archived', default='0'))
if archived not in (0, 1):
return self.sendRestErr('BadHttpParam', 'The parameter "archived" must be 0 or 1 if specified.')
except Exception:
return self.sendRestErr('BadHttpParam', 'The parameter "archived" must be 0 or 1 if specified.')
if archived:
self.sendRestRetn([u.pack() for u in self.cell.auth.users()])
return
self.sendRestRetn([u.pack() for u in self.cell.auth.users() if not u.info.get('archived')])
return
[docs]class AuthRolesV1(Handler):
[docs] async def get(self):
if not await self.reqAuthUser():
return
self.sendRestRetn([r.pack() for r in self.cell.auth.roles()])
[docs]class AuthUserV1(Handler):
[docs] async def get(self, iden):
if not await self.reqAuthUser():
return
user = self.cell.auth.user(iden)
if user is None:
self.sendRestErr('NoSuchUser', f'User {iden} does not exist.')
return
self.sendRestRetn(user.pack())
[docs] async def post(self, iden):
# TODO allow user to change their own name / email via this API
if not await self.reqAuthAdmin():
return
user = self.cell.auth.user(iden)
if user is None:
self.sendRestErr('NoSuchUser', f'User {iden} does not exist.')
return
body = self.getJsonBody()
if body is None:
return
name = body.get('name')
if name is not None:
await user.setName(str(name))
email = body.get('email')
if email is not None:
await user.info.set('email', email)
locked = body.get('locked')
if locked is not None:
await user.setLocked(bool(locked))
rules = body.get('rules')
if rules is not None:
await user.setRules(rules)
admin = body.get('admin')
if admin is not None:
await user.setAdmin(bool(admin))
archived = body.get('archived')
if archived is not None:
await user.setArchived(bool(archived))
self.sendRestRetn(user.pack())
[docs]class AuthUserPasswdV1(Handler):
[docs] async def post(self, iden):
current_user, body = await self.getUserBody()
if body is s_common.novalu:
return
user = self.cell.auth.user(iden)
if user is None:
self.sendRestErr('NoSuchUser', f'User does not exist: {iden}')
return
password = body.get('passwd')
if current_user.isAdmin() or current_user.iden == user.iden:
try:
await user.setPasswd(password)
except s_exc.BadArg as e:
self.sendRestErr('BadArg', e.get('mesg'))
return
self.sendRestRetn(user.pack())
[docs]class AuthRoleV1(Handler):
[docs] async def get(self, iden):
if not await self.reqAuthUser():
return
role = self.cell.auth.role(iden)
if role is None:
self.sendRestErr('NoSuchRole', f'Role {iden} does not exist.')
return
self.sendRestRetn(role.pack())
[docs] async def post(self, iden):
if not await self.reqAuthAdmin():
return
role = self.cell.auth.role(iden)
if role is None:
self.sendRestErr('NoSuchRole', f'Role {iden} does not exist.')
return
body = self.getJsonBody()
if body is None:
return
rules = body.get('rules')
if rules is not None:
await role.setRules(rules)
self.sendRestRetn(role.pack())
[docs]class AuthGrantV1(Handler):
'''
/api/v1/auth/grant?user=iden&role=iden
'''
[docs] async def post(self):
return await self.get()
[docs] async def get(self):
if not await self.reqAuthAdmin():
return
body = self.getJsonBody()
if body is None:
return
iden = body.get('user')
user = self.cell.auth.user(iden)
if user is None:
self.sendRestErr('NoSuchUser', f'User iden {iden} not found.')
return
iden = body.get('role')
role = self.cell.auth.role(iden)
if role is None:
self.sendRestErr('NoSuchRole', f'Role iden {iden} not found.')
return
await user.grant(role.iden)
self.sendRestRetn(user.pack())
return
[docs]class AuthRevokeV1(Handler):
'''
/api/v1/auth/grant?user=iden&role=iden
'''
[docs] async def post(self):
return await self.get()
[docs] async def get(self):
if not await self.reqAuthAdmin():
return
body = self.getJsonBody()
if body is None:
return
iden = body.get('user')
user = self.cell.auth.user(iden)
if user is None:
self.sendRestErr('NoSuchUser', f'User iden {iden} not found.')
return
iden = body.get('role')
role = self.cell.auth.role(iden)
if role is None:
self.sendRestErr('NoSuchRole', f'Role iden {iden} not found.')
return
await user.revoke(role.iden)
self.sendRestRetn(user.pack())
return
[docs]class AuthAddUserV1(Handler):
[docs] async def post(self):
if not await self.reqAuthAdmin():
return
body = self.getJsonBody()
if body is None:
return
name = body.get('name')
if name is None:
self.sendRestErr('MissingField', 'The adduser API requires a "name" argument.')
return
if await self.cell.auth.getUserByName(name) is not None:
self.sendRestErr('DupUser', f'A user named {name} already exists.')
return
user = await self.cell.auth.addUser(name)
passwd = body.get('passwd', None)
if passwd is not None:
await user.setPasswd(passwd)
admin = body.get('admin', None)
if admin is not None:
await user.setAdmin(bool(admin))
email = body.get('email', None)
if email is not None:
await user.info.set('email', email)
rules = body.get('rules')
if rules is not None:
await user.setRules(rules)
self.sendRestRetn(user.pack())
return
[docs]class AuthAddRoleV1(Handler):
[docs] async def post(self):
if not await self.reqAuthAdmin():
return
body = self.getJsonBody()
if body is None:
return
name = body.get('name')
if name is None:
self.sendRestErr('MissingField', 'The addrole API requires a "name" argument.')
return
if await self.cell.auth.getRoleByName(name) is not None:
self.sendRestErr('DupRole', f'A role named {name} already exists.')
return
role = await self.cell.auth.addRole(name)
rules = body.get('rules', None)
if rules is not None:
await role.setRules(rules)
self.sendRestRetn(role.pack())
return
[docs]class AuthDelRoleV1(Handler):
[docs] async def post(self):
if not await self.reqAuthAdmin():
return
body = self.getJsonBody()
if body is None:
return
name = body.get('name')
if name is None:
self.sendRestErr('MissingField', 'The delrole API requires a "name" argument.')
return
role = await self.cell.auth.getRoleByName(name)
if role is None:
return self.sendRestErr('NoSuchRole', f'The role {name} does not exist!')
await self.cell.auth.delRole(role.iden)
self.sendRestRetn(None)
return
[docs]class ModelNormV1(Handler):
[docs] async def post(self):
return await self.get()
[docs] async def get(self):
if not await self.reqAuthUser():
return
body = self.getJsonBody()
if body is None:
return
propname = body.get('prop')
propvalu = body.get('value')
if propname is None:
self.sendRestErr('MissingField', 'The property normalization API requires a prop name.')
return
try:
valu, info = await self.cell.getPropNorm(propname, propvalu)
except s_exc.NoSuchProp:
return self.sendRestErr('NoSuchProp', 'The property {propname} does not exist.')
except Exception as e:
return self.sendRestExc(e)
else:
self.sendRestRetn({'norm': valu, 'info': info})
[docs]class ModelV1(Handler):
[docs] async def get(self):
if not await self.reqAuthUser():
return
resp = await self.cell.getModelDict()
return self.sendRestRetn(resp)
[docs]class HealthCheckV1(Handler):
[docs] async def get(self):
if not await self.allowed(('health', )):
return
resp = await self.cell.getHealthCheck()
return self.sendRestRetn(resp)
[docs]class ActiveV1(Handler):
[docs] async def get(self):
resp = {'active': self.cell.isactive}
return self.sendRestRetn(resp)
[docs]class StormVarsGetV1(Handler):
[docs] async def get(self):
body = self.getJsonBody()
if body is None:
return
varname = str(body.get('name'))
defvalu = body.get('default')
if not await self.allowed(('globals', 'get', varname)):
return
valu = await self.cell.getStormVar(varname, default=defvalu)
return self.sendRestRetn(valu)
[docs]class StormVarsPopV1(Handler):
[docs] async def post(self):
body = self.getJsonBody()
if body is None:
return
varname = str(body.get('name'))
defvalu = body.get('default')
if not await self.allowed(('globals', 'pop', varname)):
return
valu = await self.cell.popStormVar(varname, default=defvalu)
return self.sendRestRetn(valu)
[docs]class OnePassIssueV1(Handler):
'''
/api/v1/auth/onepass/issue
'''
[docs] async def post(self):
if not await self.reqAuthAdmin():
return
body = self.getJsonBody()
if body is None:
return
useriden = body.get('user')
duration = body.get('duration', 600000) # 10 mins default
user = self.cell.auth.user(useriden)
if user is None:
return self.sendRestErr('NoSuchUser', 'The user iden does not exist.')
passwd = s_common.guid()
shadow = await s_passwd.getShadowV2(passwd=passwd)
now = s_common.now()
onepass = {'create': now, 'expires': s_common.now() + duration,
'shadow': shadow}
await self.cell.auth.setUserInfo(useriden, 'onepass', onepass)
logger.debug(f'Issued one time password for {user.name}',
extra={'synapse': {'user': user.iden, 'username': user.name}})
return self.sendRestRetn(passwd)
[docs]class FeedV1(Handler):
'''
/api/v1/feed
Examples:
Example data::
{
'name': 'syn.nodes',
'view': null,
'items': [...],
}
'''
[docs] async def post(self):
if not await self.reqAuthUser():
return
user = await self.user()
body = self.getJsonBody()
if body is None:
return
items = body.get('items')
name = body.get('name', 'syn.nodes')
func = self.cell.getFeedFunc(name)
if func is None:
return self.sendRestErr('NoSuchFunc', f'The feed type {name} does not exist.')
view = self.cell.getView(body.get('view'), user)
if view is None:
return self.sendRestErr('NoSuchView', 'The specified view does not exist.')
wlyr = view.layers[0]
perm = ('feed:data', *name.split('.'))
if not user.allowed(perm, gateiden=wlyr.iden):
permtext = '.'.join(perm)
mesg = f'User does not have {permtext} permission on gate: {wlyr.iden}.'
return self.sendRestErr('AuthDeny', mesg)
try:
info = {'name': name, 'view': view.iden, 'nitems': len(items)}
await self.cell.boss.promote('feeddata', user=user, info=info)
async with await self.cell.snap(user=user, view=view) as snap:
snap.strict = False
await snap.addFeedData(name, items)
return self.sendRestRetn(None)
except Exception as e: # pragma: no cover
return self.sendRestExc(e)
[docs]class CoreInfoV1(Handler):
'''
/api/v1/core/info
'''
[docs] async def get(self):
if not await self.reqAuthUser():
return
resp = await self.cell.getCoreInfoV2()
return self.sendRestRetn(resp)