import os
import json
import signal
import asyncio
import logging
import traceback
import collections
import regex
from prompt_toolkit import PromptSession, print_formatted_text
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.history import FileHistory
from prompt_toolkit.patch_stdout import patch_stdout
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.telepath as s_telepath
import synapse.lib.base as s_base
import synapse.lib.output as s_output
import synapse.lib.parser as s_parser
import synapse.lib.grammar as s_grammar
import synapse.lib.version as s_version
logger = logging.getLogger(__name__)
[docs]
class Cmd:
'''
Base class for modular commands in the synapse CLI.
'''
_cmd_name = 'fixme'
_cmd_syntax = ()
def __init__(self, cli, **opts):
self._cmd_cli = cli
self._cmd_opts = opts
[docs]
async def runCmdLine(self, line):
'''
Run a line of command input for this command.
Args:
line (str): Line to execute
Examples:
Run the foo command with some arguments:
await foo.runCmdLine('foo --opt baz woot.com')
'''
opts = self.getCmdOpts(line)
return await self.runCmdOpts(opts)
[docs]
def getCmdItem(self):
'''
Get a reference to the object we are commanding.
'''
return self._cmd_cli.item
[docs]
def getCmdOpts(self, text):
'''
Use the _cmd_syntax def to split/parse/normalize the cmd line.
Args:
text (str): Command to process.
Notes:
This is implemented independent of argparse (et al) due to the
need for syntax aware argument splitting. Also, allows different
split per command type
Returns:
dict: An opts dictionary.
'''
off = 0
_, off = s_grammar.nom(text, off, s_grammar.whites)
name, off = s_grammar.meh(text, off, s_grammar.whites)
_, off = s_grammar.nom(text, off, s_grammar.whites)
opts = {}
args = collections.deque([synt for synt in self._cmd_syntax if not synt[0].startswith('-')])
switches = {synt[0]: synt for synt in self._cmd_syntax if synt[0].startswith('-')}
# populate defaults and lists
for synt in self._cmd_syntax:
snam = synt[0].strip('-')
defval = synt[1].get('defval')
if defval is not None:
opts[snam] = defval
if synt[1].get('type') == 'list':
opts[snam] = []
def atswitch(t, o):
# check if we are at a recognized switch. if not
# assume the data is part of regular arguments.
if not text.startswith('-', o):
return None, o
name, x = s_grammar.meh(t, o, s_grammar.whites)
swit = switches.get(name)
if swit is None:
return None, o
return swit, x
while off < len(text):
_, off = s_grammar.nom(text, off, s_grammar.whites)
swit, off = atswitch(text, off)
if swit is not None:
styp = swit[1].get('type', 'flag')
snam = swit[0].strip('-')
if styp == 'valu':
valu, off = s_parser.parse_cmd_string(text, off)
opts[snam] = valu
elif styp == 'list':
valu, off = s_parser.parse_cmd_string(text, off)
if not isinstance(valu, list):
valu = valu.split(',')
opts[snam].extend(valu)
elif styp == 'enum':
vals = swit[1].get('enum:vals')
valu, off = s_parser.parse_cmd_string(text, off)
if valu not in vals:
raise s_exc.BadSyntax(mesg='%s (%s)' % (swit[0], '|'.join(vals)),
text=text)
opts[snam] = valu
else:
opts[snam] = True
continue
if not args:
raise s_exc.BadSyntax(mesg='trailing text: [%s]' % (text[off:],),
text=text)
synt = args.popleft()
styp = synt[1].get('type', 'valu')
# a glob type eats the remainder of the string
if styp == 'glob':
opts[synt[0]] = text[off:]
break
# eat the remainder of the string as separate vals
if styp == 'list':
valu = []
while off < len(text):
item, off = s_parser.parse_cmd_string(text, off)
valu.append(item)
opts[synt[0]] = valu
break
valu, off = s_parser.parse_cmd_string(text, off)
opts[synt[0]] = valu
return opts
[docs]
def getCmdBrief(self):
'''
Return the single-line description for this command.
'''
return self.getCmdDoc().strip().split('\n', 1)[0].strip()
[docs]
def getCmdName(self):
return self._cmd_name
[docs]
def getCmdDoc(self):
'''
Return the help/doc output for this command.
'''
if not self.__doc__: # pragma: no cover
return ''
return self.__doc__
[docs]
def printf(self, mesg, addnl=True, color=None):
return self._cmd_cli.printf(mesg, addnl=addnl, color=color)
[docs]
async def runCmdOpts(self, opts):
'''
Perform the command actions. Must be implemented by Cmd implementers.
Args:
opts (dict): Options dictionary.
'''
raise s_exc.NoSuchImpl(mesg='runCmdOpts must be implemented by subclasses.',
name='runCmdOpts')
_setre = regex.compile(r'\s*set\s+editing-mode\s+vi\s*')
def _inputrc_enables_vi_mode():
'''
Emulate a small bit of readline behavior.
Returns:
(bool) True if current user enabled vi mode ("set editing-mode vi") in .inputrc
'''
for filepath in (os.path.expanduser('~/.inputrc'), '/etc/inputrc'):
try:
with open(filepath) as f:
for line in f:
if _setre.fullmatch(line):
return True
except IOError:
continue
return False
[docs]
class Cli(s_base.Base):
'''
A modular / event-driven CLI base object.
'''
histfile = 'cmdr_history'
async def __anit__(self, item, outp=None, **locs):
await s_base.Base.__anit__(self)
if outp is None:
outp = s_output.OutPut()
self.outp = outp
self.locs = locs
self.cmdtask = None # type: asyncio.Task
self.sess = None
self.vi_mode = _inputrc_enables_vi_mode()
self.item = item # whatever object we are commanding
self.echoline = False
self.colorsenabled = False
self.completer = None
if isinstance(self.item, s_base.Base):
self.item.onfini(self._onItemFini)
self.locs['syn:local:version'] = s_version.verstring
if isinstance(self.item, s_telepath.Proxy):
version = self.item._getSynVers()
if version is None: # pragma: no cover
self.locs['syn:remote:version'] = 'Remote Synapse version unavailable'
else:
self.locs['syn:remote:version'] = '.'.join([str(v) for v in version])
self.cmds = {}
self.cmdprompt = 'cli> '
self.initCmdClasses()
[docs]
def initCmdClasses(self):
self.addCmdClass(CmdHelp)
self.addCmdClass(CmdQuit)
self.addCmdClass(CmdLocals)
async def _onItemFini(self):
if self.isfini:
return
self.printf('connection closed...')
await self.fini()
[docs]
async def addSignalHandlers(self): # pragma: no cover
'''
Register SIGINT signal handler with the ioloop to cancel the currently running cmdloop task.
Removes the handler when the cli is fini'd.
'''
def sigint():
if self.cmdtask is not None:
self.cmdtask.cancel()
self.loop.add_signal_handler(signal.SIGINT, sigint)
def onfini():
# N.B. This is reaches into some loop / handle internals but
# prevents us from removing a handler that overwrote our own.
hndl = self.loop._signal_handlers.get(signal.SIGINT, None) # type: asyncio.Handle
if hndl is not None and hndl._callback is sigint:
self.loop.remove_signal_handler(signal.SIGINT)
self.onfini(onfini)
[docs]
def get(self, name, defval=None):
return self.locs.get(name, defval)
[docs]
def set(self, name, valu):
self.locs[name] = valu
[docs]
async def prompt(self, text=None):
'''
Prompt for user input from stdin.
'''
if self.sess is None:
history = None
histfp = s_common.getSynPath(self.histfile)
# Ensure the file is read/writeable
try:
with s_common.genfile(histfp):
pass
history = FileHistory(histfp)
except OSError: # pragma: no cover
logger.warning(f'Unable to create file at {histfp}, cli history will not be stored.')
self.sess = PromptSession(
history=history,
completer=self.completer,
complete_while_typing=False,
reserve_space_for_menu=5,
)
if text is None:
text = self.cmdprompt
with patch_stdout(): # pragma: no cover
retn = await self.sess.prompt_async(text,
vi_mode=self.vi_mode,
enable_open_in_editor=True,
handle_sigint=False # We handle sigint in the loop
)
return retn
[docs]
def printf(self, mesg, addnl=True, color=None):
if not self.colorsenabled:
return self.outp.printf(mesg, addnl=addnl)
# print_formatted_text can't handle \r
mesg = mesg.replace('\r', '')
if color is not None:
mesg = FormattedText([(color, mesg)])
return print_formatted_text(mesg, end='\n' if addnl else '')
[docs]
def addCmdClass(self, ctor, **opts):
'''
Add a Cmd subclass to this cli.
'''
item = ctor(self, **opts)
name = item.getCmdName()
self.cmds[name] = item
[docs]
def getCmdNames(self):
'''
Return a list of all the known command names for the CLI.
'''
return list(self.cmds.keys())
[docs]
def getCmdByName(self, name):
'''
Return a Cmd instance by name.
'''
return self.cmds.get(name)
[docs]
def getCmdPrompt(self):
'''
Get the command prompt.
Returns:
str: Configured command prompt
'''
return self.cmdprompt
[docs]
async def runCmdLoop(self):
'''
Run commands from a user in an interactive fashion until fini() or EOFError is raised.
'''
while not self.isfini:
self.cmdtask = None
try:
line = await self.prompt()
if not line:
continue
line = line.strip()
if not line:
continue
coro = self.runCmdLine(line)
self.cmdtask = self.schedCoro(coro)
await self.cmdtask
except (KeyboardInterrupt, asyncio.CancelledError):
if self.isfini:
return
self.printf('<ctrl-c>')
except (s_exc.CliFini, EOFError):
await self.fini()
except Exception:
s = traceback.format_exc()
self.printf(s)
finally:
if self.cmdtask is not None:
self.cmdtask.cancel()
try:
await asyncio.wait_for(self.cmdtask, timeout=0.1)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
[docs]
async def runCmdLine(self, line):
'''
Run a single command line.
Args:
line (str): Line to execute.
Examples:
Execute the 'woot' command with the 'help' switch:
await cli.runCmdLine('woot --help')
Returns:
object: Arbitrary data from the cmd class.
'''
if self.echoline:
self.outp.printf(f'{self.cmdprompt}{line}')
ret = None
name = line.split(None, 1)[0]
cmdo = self.getCmdByName(name)
if cmdo is None:
self.printf('cmd not found: %s' % (name,))
return
try:
ret = await cmdo.runCmdLine(line)
except s_exc.CliFini:
await self.fini()
except asyncio.CancelledError:
self.printf('Cmd cancelled')
except s_exc.ParserExit as e:
pass # avoid duplicate print
except Exception as e:
exctxt = traceback.format_exc()
self.printf(exctxt)
self.printf('error: %s' % e)
return ret
[docs]
class CmdQuit(Cmd):
'''
Quit the current command line interpreter.
Example:
quit
'''
_cmd_name = 'quit'
[docs]
async def runCmdOpts(self, opts):
self.printf('o/')
raise s_exc.CliFini()
[docs]
class CmdHelp(Cmd):
'''
List commands and display help output.
Example:
help foocmd
'''
_cmd_name = 'help'
_cmd_syntax = (
('cmds', {'type': 'list'}), # type: ignore
)
[docs]
async def runCmdOpts(self, opts):
cmds = opts.get('cmds')
# if they didn't specify one, just show the list
if not cmds:
cmds = sorted(self._cmd_cli.getCmdNames())
padsize = max([len(n) for n in cmds])
for name in cmds:
padname = name.ljust(padsize)
cmdo = self._cmd_cli.getCmdByName(name)
brief = cmdo.getCmdBrief()
self.printf('%s - %s' % (padname, brief))
return
for name in cmds:
cmdo = self._cmd_cli.getCmdByName(name)
if cmdo is None:
self.printf('=== NOT FOUND: %s' % (name,))
continue
self.printf('=== %s' % (name,))
self.printf(cmdo.getCmdDoc())
return
[docs]
class CmdLocals(Cmd):
'''
List the current locals for a given CLI object.
'''
_cmd_name = 'locs'
[docs]
async def runCmdOpts(self, opts):
ret = {}
for k, v in self._cmd_cli.locs.items():
if isinstance(v, (int, str)):
ret[k] = v
else:
ret[k] = repr(v)
mesg = json.dumps(ret, indent=2, sort_keys=True)
self.printf(mesg)