import time
import types
import calendar
import datetime
import functools
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.cli as s_cli
import synapse.lib.cmd as s_cmd
import synapse.lib.time as s_time
import synapse.lib.parser as s_parser
StatHelp = '''
Gives detailed information about a single cron job.
Syntax:
cron stat <iden prefix>
Notes:
Any prefix that matches exactly one valid cron job iden is accepted.
'''
DelHelp = '''
Deletes a single cron job.
Syntax:
cron del|rm <iden prefix>
Notes:
Any prefix that matches exactly one valid cron job iden is accepted.
'''
ListHelp = '''
List existing cron jobs in a cortex.
Syntax:
cron list|ls
Example:
user iden en? rpt? now? err? # start last start last end query
root 029ce7bd.. Y Y N 17863 2019-06-11T21:47 2019-06-11T21:47 exec foo
root 06b46533.. Y Y N 18140 2019-06-11T21:48 2019-06-11T21:48 exec bar
'''
ModHelp = '''
Changes an existing cron job's query.
Syntax:
cron mod|edit <iden prefix> <new query>
Notes:
Any prefix that matches exactly one valid cron iden is accepted.
'''
EnableHelp = '''
Enable an existing cron job.
Syntax:
cron enable <iden prefix>
Notes:
Any prefix that matches exactly one valid cron iden is accepted.
'''
DisableHelp = '''
Disable an existing cron job.
Syntax:
cron disable <iden prefix>
Notes:
Any prefix that matches exactly one valid cron iden is accepted.
'''
AddHelp = '''
Add a recurring cron job to a cortex.
Syntax:
cron add [optional arguments] {query}
--minute, -M int[,int...][=]
--hour, -H
--day, -d
--month, -m
--year, -y
*or:*
[--hourly <min> |
--daily <hour>:<min> |
--monthly <day>:<hour>:<min> |
--yearly <month>:<day>:<hour>:<min>]
Notes:
All times are interpreted as UTC.
All arguments are interpreted as the job period, unless the value ends in
an equals sign, in which case the argument is interpreted as the recurrence
period. Only one recurrence period parameter may be specified.
Currently, a fixed unit must not be larger than a specified recurrence
period. i.e. '--hour 7 --minute +15' (every 15 minutes from 7-8am?) is not
supported.
Value values for fixed hours are 0-23 on a 24-hour clock where midnight is 0.
If the --day parameter value does not start with in '+' and is an integer, it is
interpreted as a fixed day of the month. A negative integer may be
specified to count from the end of the month with -1 meaning the last day
of the month. All fixed day values are clamped to valid days, so for
example '-d 31' will run on February 28.
If the fixed day parameter is a value in ([Mon, Tue, Wed, Thu, Fri, Sat,
Sun] if locale is set to English) it is interpreted as a fixed day of the
week.
Otherwise, if the parameter value starts with a '+', then it is interpreted
as an recurrence interval of that many days.
If no plus-sign-starting parameter is specified, the recurrence period
defaults to the unit larger than all the fixed parameters. e.g. '-M 5'
means every hour at 5 minutes past, and -H 3, -M 1 means 3:01 every day.
At least one optional parameter must be provided.
All parameters accept multiple comma-separated values. If multiple
parameters have multiple values, all combinations of those values are used.
All fixed units not specified lower than the recurrence period default to
the lowest valid value, e.g. -m +2 will be scheduled at 12:00am the first of
every other month. One exception is the largest fixed value is day of the
week, then the default period is set to be a week.
A month period with a day of week fixed value is not currently supported.
Fixed-value year (i.e. --year 2019) is not supported. See the 'at'
command for one-time cron jobs.
As an alternative to the above options, one may use exactly one of
--hourly, --daily, --monthly, --yearly with a colon-separated list of
fixed parameters for the value. It is an error to use both the individual
options and these aliases at the same time.
Examples:
Run a query every last day of the month at 3 am
cron add -H 3 -d-1 {#foo}
Run a query every 8 hours
cron add -H +8 {#foo}
Run a query every Wednesday and Sunday at midnight and noon
cron add -H 0,12 -d Wed,Sun {#foo}
Run a query every other day at 3:57pm
cron add -d +2 -M 57 -H 15 {#foo}
'''
[docs]class Cron(s_cli.Cmd):
'''
Manages cron jobs in a cortex.
Cron jobs are rules persistently stored in a cortex such that storm queries
automatically run on a time schedule.
Cron jobs may be be recurring or one-time. Use the 'at' command to add
one-time jobs.
A subcommand is required. Use 'cron -h' for more detailed help. '''
_cmd_name = 'cron'
_cmd_syntax = (
('line', {'type': 'glob'}), # type: ignore
)
async def _match_idens(self, core, prefix):
'''
Returns the iden that starts with prefix. Prints out error and returns None if it doesn't match
exactly one.
'''
idens = [cron['iden'] for cron in await core.listCronJobs()]
matches = [iden for iden in idens if iden.startswith(prefix)]
if len(matches) == 1:
return matches[0]
elif len(matches) == 0:
self.printf('Error: provided iden does not match any valid authorized cron job')
else:
self.printf('Error: provided iden matches more than one cron job')
return None
def _make_argparser(self):
parser = s_cmd.Parser(prog='cron', outp=self, description=self.__doc__)
subparsers = parser.add_subparsers(title='subcommands', required=True, dest='cmd',
parser_class=functools.partial(s_cmd.Parser, outp=self))
subparsers.add_parser('list', aliases=['ls'], help="List cron jobs you're allowed to manipulate",
usage=ListHelp)
parser_add = subparsers.add_parser('add', help='add a cron job', usage=AddHelp)
parser_add.add_argument('--minute', '-M')
parser_add.add_argument('--hour', '-H')
parser_add.add_argument('--day', '-d', help='day of week, day of month or number of days')
parser_add.add_argument('--month', '-m')
parser_add.add_argument('--year', '-y')
group = parser_add.add_mutually_exclusive_group()
group.add_argument('--hourly')
group.add_argument('--daily')
group.add_argument('--monthly')
group.add_argument('--yearly')
parser_add.add_argument('query', help='Storm query in curly braces')
parser_del = subparsers.add_parser('del', aliases=['rm'], help='delete a cron job', usage=DelHelp)
parser_del.add_argument('prefix', help='Cron job iden prefix')
parser_stat = subparsers.add_parser('stat', help='details a cron job', usage=StatHelp)
parser_stat.add_argument('prefix', help='Cron job iden prefix')
parser_mod = subparsers.add_parser('mod', aliases=['edit'], help='change an existing cron job', usage=ModHelp)
parser_mod.add_argument('prefix', help='Cron job iden prefix')
parser_mod.add_argument('query', help='New Storm query in curly braces')
parser_en = subparsers.add_parser('enable', help='enable an existing cron job', usage=EnableHelp)
parser_en.add_argument('prefix', help='Cron job iden prefix')
parser_dis = subparsers.add_parser('disable', help='disable an existing cron job', usage=DisableHelp)
parser_dis.add_argument('prefix', help='Cron job iden prefix')
return parser
@staticmethod
def _parse_weekday(val):
''' Try to match a day-of-week abbreviation, then try a day-of-week full name '''
val = val.title()
try:
return list(calendar.day_abbr).index(val)
except ValueError:
try:
return list(calendar.day_name).index(val)
except ValueError:
return None
@staticmethod
def _parse_incval(incunit, incval):
''' Parse a non-day increment value. Should be an integer or a comma-separated integer list. '''
try:
retn = [int(val) for val in incval.split(',')]
except ValueError:
return None
return retn[0] if len(retn) == 1 else retn
@staticmethod
def _parse_req(requnit, reqval):
''' Parse a non-day fixed value '''
assert reqval[0] != '='
try:
retn = []
for val in reqval.split(','):
if requnit == 'month':
if reqval[0].isdigit():
retn.append(int(reqval)) # must be a month (1-12)
else:
try:
retn.append(list(calendar.month_abbr).index(val.title()))
except ValueError:
retn.append(list(calendar.month_name).index(val.title()))
else:
retn.append(int(val))
except ValueError:
return None
if not retn:
return None
return retn[0] if len(retn) == 1 else retn
@staticmethod
def _parse_day(optval):
''' Parse a --day argument '''
isreq = not optval.startswith('+')
if not isreq:
optval = optval[1:]
try:
retnval = []
unit = None
for val in optval.split(','):
if not val:
raise ValueError
if val[-1].isdigit():
newunit = 'dayofmonth' if isreq else 'day'
if unit is None:
unit = newunit
elif newunit != unit:
raise ValueError
retnval.append(int(val))
else:
newunit = 'dayofweek'
if unit is None:
unit = newunit
elif newunit != unit:
raise ValueError
weekday = Cron._parse_weekday(val)
if weekday is None:
raise ValueError
retnval.append(weekday)
if len(retnval) == 0:
raise ValueError
except ValueError:
return None, None
if len(retnval) == 1:
retnval = retnval[0]
return unit, retnval
def _parse_alias(self, opts):
retn = types.SimpleNamespace()
retn.query = opts.query
if opts.hourly is not None:
retn.hour = '+1'
retn.minute = str(int(opts.hourly))
return retn
if opts.daily is not None:
fields = time.strptime(opts.daily, '%H:%M')
retn.day = '+1'
retn.hour = str(fields.tm_hour)
retn.minute = str(fields.tm_min)
return retn
if opts.monthly is not None:
day, rest = opts.monthly.split(':', 1)
fields = time.strptime(rest, '%H:%M')
retn.month = '+1'
retn.day = day
retn.hour = str(fields.tm_hour)
retn.minute = str(fields.tm_min)
return retn
if opts.yearly is not None:
fields = opts.yearly.split(':')
if len(fields) != 4:
raise ValueError(f'Failed to parse parameter {opts.yearly}')
retn.year = '+1'
retn.month, retn.day, retn.hour, retn.minute = fields
return retn
return None
async def _handle_add(self, core, opts):
incunit = None
incval = None
reqdict = {}
valinfo = { # unit: (minval, next largest unit)
'month': (1, 'year'),
'dayofmonth': (1, 'month'),
'hour': (0, 'day'),
'minute': (0, 'hour'),
}
try:
alias_opts = self._parse_alias(opts)
except ValueError as e:
self.printf(f'Error: Failed to parse ..ly parameter: {" ".join(e.args)}')
return
if alias_opts:
if opts.year or opts.month or opts.day or opts.hour or opts.minute:
self.printf('Error: may not use both alias (..ly) and explicit options at the same time')
return
opts = alias_opts
for optname in ('year', 'month', 'day', 'hour', 'minute'):
optval = getattr(opts, optname, None)
if optval is None:
if incunit is None and not reqdict:
continue
# The option isn't set, but a higher unit is. Go ahead and set the required part to the lowest valid
# value, e.g. so -m 2 would run on the *first* of every other month at midnight
if optname == 'day':
reqdict['dayofmonth'] = 1
else:
reqdict[optname] = valinfo[optname][0]
continue
isreq = not optval.startswith('+')
if optname == 'day':
unit, val = self._parse_day(optval)
if val is None:
self.printf(f'Error: failed to parse day value "{optval}"')
return
if unit == 'dayofweek':
if incunit is not None:
self.printf('Error: May not provide a recurrence value with day of week')
return
if reqdict:
self.printf('Error: may not fix month or year with day of week')
return
incunit, incval = unit, val
elif unit == 'day':
incunit, incval = unit, val
else:
assert unit == 'dayofmonth'
reqdict[unit] = val
continue
if not isreq:
if incunit is not None:
self.printf('Error: may not provide more than 1 recurrence parameter')
return
if reqdict:
self.printf('Error: fixed unit may not be larger than recurrence unit')
return
incunit = optname
incval = self._parse_incval(optname, optval)
if incval is None:
self.printf('Error: failed to parse parameter')
return
continue
if optname == 'year':
self.printf('Error: year may not be a fixed value')
return
reqval = self._parse_req(optname, optval)
if reqval is None:
self.printf(f'Error: failed to parse fixed parameter "{optval}"')
return
reqdict[optname] = reqval
# If not set, default (incunit, incval) to (1, the next largest unit)
if incunit is None:
if not reqdict:
self.printf('Error: must provide at least one optional argument')
return
requnit = next(iter(reqdict)) # the first key added is the biggest unit
incunit = valinfo[requnit][1]
incval = 1
cdef = {'storm': opts.query,
'reqs': reqdict,
'incunit': incunit,
'incvals': incval,
}
newcdef = await core.addCronJob(cdef)
self.printf(f'Created cron job {newcdef["iden"]}')
@staticmethod
def _format_timestamp(ts):
# N.B. normally better to use fromtimestamp with UTC timezone, but we don't want timezone to print out
return datetime.datetime.utcfromtimestamp(ts).isoformat(timespec='minutes')
async def _handle_list(self, core, opts):
cronlist = await core.listCronJobs()
if not cronlist:
self.printf('No cron jobs found')
return
self.printf(
f'{"user":10} {"iden":10} {"en?":3} {"rpt?":4} {"now?":4} {"err?":4} '
f'{"# start":7} {"last start":16} {"last end":16} {"query"}')
for cron in cronlist:
iden = cron.get('iden')
idenf = iden[:8] + '..'
user = cron.get('username') or '<None>'
query = cron.get('query') or '<missing>'
isrecur = 'Y' if cron.get('recur') else 'N'
isrunning = 'Y' if cron.get('isrunning') else 'N'
enabled = 'Y' if cron.get('enabled') else 'N'
startcount = cron.get('startcount') or 0
laststart = cron.get('laststarttime')
laststart = 'Never' if laststart is None else self._format_timestamp(laststart)
lastend = cron.get('lastfinishtime')
lastend = 'Never' if lastend is None else self._format_timestamp(lastend)
result = cron.get('lastresult')
iserr = 'X' if result is not None and not result.startswith('finished successfully') else ' '
self.printf(
f'{user:10} {idenf:10} {enabled:3} {isrecur:4} {isrunning:4} {iserr:4} '
f'{startcount:7} {laststart:16} {lastend:16} {query}')
async def _handle_mod(self, core, opts):
prefix = opts.prefix
iden = await self._match_idens(core, prefix)
if iden is None:
return
await core.updateCronJob(iden, opts.query)
self.printf(f'Modified cron job {iden}')
async def _handle_enable(self, core, opts):
prefix = opts.prefix
iden = await self._match_idens(core, prefix)
if iden is None:
return
await core.enableCronJob(iden)
self.printf(f'Enabled cron job {iden}')
async def _handle_disable(self, core, opts):
prefix = opts.prefix
iden = await self._match_idens(core, prefix)
if iden is None:
return
await core.disableCronJob(iden)
self.printf(f'Disabled cron job {iden}')
async def _handle_del(self, core, opts):
prefix = opts.prefix
iden = await self._match_idens(core, prefix)
if iden is None:
return
await core.delCronJob(iden)
self.printf(f'Deleted cron job {iden}')
async def _handle_stat(self, core, opts):
''' Prints details about a particular cron job. Not actually a different API call '''
prefix = opts.prefix
crons = await core.listCronJobs()
idens = [cron['iden'] for cron in crons]
matches = [iden for iden in idens if iden.startswith(prefix)]
if len(matches) == 0:
self.printf('Error: provided iden does not match any valid authorized cron job')
return
elif len(matches) > 1:
self.printf('Error: provided iden matches more than one cron job')
return
iden = matches[0]
cron = [cron for cron in crons if cron.get('iden') == iden][0]
user = cron.get('username') or '<None>'
query = cron.get('query') or '<missing>'
isrecur = 'Yes' if cron.get('recur') else 'No'
enabled = 'Yes' if cron.get('enabled') else 'No'
startcount = cron.get('startcount') or 0
recs = cron.get('recs', [])
laststart = cron.get('laststarttime')
lastend = cron.get('lastfinishtime')
laststart = 'Never' if laststart is None else self._format_timestamp(laststart)
lastend = 'Never' if lastend is None else self._format_timestamp(lastend)
lastresult = cron.get('lastresult') or '<None>'
self.printf(f'iden: {iden}')
self.printf(f'user: {user}')
self.printf(f'enabled: {enabled}')
self.printf(f'recurring: {isrecur}')
self.printf(f'# starts: {startcount}')
self.printf(f'last start time: {laststart}')
self.printf(f'last end time: {lastend}')
self.printf(f'last result: {lastresult}')
self.printf(f'query: {query}')
if not recs:
self.printf(f'entries: <None>')
else:
self.printf(f'entries: {"incunit":10} {"incval":6} {"required"}')
for reqdict, incunit, incval in recs:
reqdict = reqdict or '<None>'
incunit = '<None>' if incunit is None else incunit
incval = '<None>' if incval is None else incval
self.printf(f' {incunit:10} {incval:6} {reqdict}')
[docs] async def runCmdOpts(self, opts):
s_common.deprdate('cmdr> cron', s_common._splicedepr)
line = opts.get('line')
if line is None:
self.printf(self.__doc__)
return
core = self.getCmdItem()
argv = s_parser.Parser(line).cmdrargs()
try:
opts = self._make_argparser().parse_args(argv)
except s_exc.ParserExit:
return
handlers = {
'add': self._handle_add,
'del': self._handle_del,
'rm': self._handle_del,
'disable': self._handle_disable,
'enable': self._handle_enable,
'list': self._handle_list,
'ls': self._handle_list,
'mod': self._handle_mod,
'edit': self._handle_mod,
'stat': self._handle_stat,
}
await handlers[opts.cmd](core, opts)
[docs]class At(s_cli.Cmd):
'''
Adds a non-recurring cron job.
It will execute a Storm query at one or more specified times.
List/details/deleting cron jobs created with 'at' use the same commands as
other cron jobs: cron list/stat/del respectively.
Syntax:
at (time|+time delta)+ {query}
Notes:
This command accepts one or more time specifications followed by exactly
one storm query in curly braces. Each time specification may be in synapse
time delta format (e.g + 1 day) or synapse time format (e.g.
20501217030432101). Seconds will be ignored, as cron jobs' granularity is
limited to minutes.
All times are interpreted as UTC.
The other option for time specification is a relative time from now. This
consists of a plus sign, a positive integer, then one of 'minutes, hours,
days'.
Note that the record for a cron job is stored until explicitly deleted via
"cron del".
Examples:
# Run a storm query in 5 minutes
at +5 minutes {[inet:ipv4=1]}
# Run a storm query tomorrow and in a week
at +1 day +7 days {[inet:ipv4=1]}
# Run a query at the end of the year Zulu
at 20181231Z2359 {[inet:ipv4=1]}
'''
_cmd_name = 'at'
_cmd_syntax = (
('line', {'type': 'glob'}), # type: ignore
)
def _make_argparser(self):
parser = s_cmd.Parser(prog='at', outp=self, description=self.__doc__)
parser.add_argument('args', nargs='+', help='date | delta| {query})')
return parser
[docs] async def runCmdOpts(self, opts):
s_common.deprdate('cmdr> at', s_common._splicedepr)
line = opts.get('line')
if line is None:
self.printf(self.__doc__)
return
core = self.getCmdItem()
argv = s_parser.Parser(line).cmdrargs()
# Currently, using an argparser is overkill for this command. Using for future extensibility (and help).
try:
opts = self._make_argparser().parse_args(argv)
except s_exc.ParserExit:
return
query = None
consumed_next = False
tslist = []
# TODO: retrieve time from cortex in case of wrong cmdr time
now = time.time()
query = opts.args[-1]
for pos, arg in enumerate(opts.args[:-1]):
try:
if consumed_next:
consumed_next = False
continue
if arg.startswith('+'):
if arg[-1].isdigit():
if pos == len(opts.args) - 2:
self.printf('Time delta missing unit')
return
arg = f'{arg} {opts.args[pos + 1]}'
consumed_next = True
ts = now + s_time.delta(arg) / 1000.0
tslist.append(ts)
continue
ts = s_time.parse(arg) / 1000.0
tslist.append(ts)
except (ValueError, s_exc.BadTypeValu):
self.printf(f'Error: Trouble parsing "{arg}"')
return
if query is None:
self.printf('Error: Missing query argument')
return
def _ts_to_reqdict(ts):
dt = datetime.datetime.fromtimestamp(ts, datetime.timezone.utc)
return {
'minute': dt.minute,
'hour': dt.hour,
'dayofmonth': dt.day,
'month': dt.month,
'year': dt.year
}
if not tslist:
self.printf('Error: at least one requirement must be provided')
return
reqdicts = [_ts_to_reqdict(ts) for ts in tslist]
cdef = {'storm': query,
'reqs': reqdicts,
'incunit': None,
'incvals': None,
}
newcdef = await core.addCronJob(cdef)
self.printf(f'Created cron job {newcdef["iden"]}')