'''
Time related utilities for synapse "epoch millis" time values.
'''
import logging
import datetime
import calendar
from dateutil.relativedelta import relativedelta
import pytz
import regex
import synapse.exc as s_exc
import synapse.lookup.timezones as s_l_timezones
logger = logging.getLogger(__name__)
EPOCH = datetime.datetime(1970, 1, 1)
onesec = 1000
onemin = 60000
onehour = 3600000
oneday = 86400000
timeunits = {
'sec': onesec,
'secs': onesec,
'seconds': onesec,
'min': onemin,
'mins': onemin,
'minute': onemin,
'minutes': onemin,
'hour': onehour,
'hours': onehour,
'day': oneday,
'days': oneday,
}
tzcat = '|'.join(sorted(s_l_timezones.getTzNames(), key=lambda x: len(x), reverse=True))
unitcat = '|'.join(sorted(timeunits.keys(), key=lambda x: len(x), reverse=True))
tz_re = regex.compile(
r'\d(?P<tzstr>\s?(?:'
r'(?P<tzname>%s)|'
r'(?:(?P<tzrel>\-|\+)(?P<tzhr>\d{1,2}):?(?P<tzmin>\d{2})))'
r')(?:[\-|\+]\d+(?:%s))?$' % (tzcat, unitcat),
flags=regex.IGNORECASE
)
daycat = '|'.join(calendar.day_abbr[i].lower() for i in range(7))
monthcat = '|'.join(calendar.month_abbr[i].lower() for i in range(1, 13))
rfc822_re = regex.compile(r'((?:%s),)?\d{1,2}(?:%s)\d{4}\d{2}:\d{2}:\d{2}' % (daycat, monthcat))
rfc822_fmt = '%d%b%Y%H:%M:%S'
def _rawparse(text, base=None, chop=False):
otext = text
text = text.strip().lower().replace(' ', '')
parsed_tz = False
if base is None:
text, base = parsetz(text)
if base != 0:
parsed_tz = True
# regex match is ~10x faster than strptime, so optimize
# for the case that *most* datetimes will not be RFC822
rfc822_match = rfc822_re.match(text)
if rfc822_match is not None:
# remove leading day since it is not used in strptime
if grp := rfc822_match.groups()[0]:
text = text.replace(grp, '', 1)
try:
dt = datetime.datetime.strptime(text, rfc822_fmt)
return dt, base, len(text)
except ValueError as e:
raise s_exc.BadTypeValu(mesg=f'Error parsing time as RFC822 "{otext}"; {str(e)}', valu=otext)
text = (''.join([c for c in text if c.isdigit()]))
if chop:
text = text[:17]
tlen = len(text)
try:
if tlen == 4:
if parsed_tz:
raise s_exc.BadTypeValu(mesg=f'Not enough information to parse timezone properly for {otext}.',
valu=otext)
dt = datetime.datetime.strptime(text, '%Y')
elif tlen == 6:
if parsed_tz:
raise s_exc.BadTypeValu(mesg=f'Not enough information to parse timezone properly for {otext}.',
valu=otext)
dt = datetime.datetime.strptime(text, '%Y%m')
elif tlen == 8:
if parsed_tz:
raise s_exc.BadTypeValu(mesg=f'Not enough information to parse timezone properly for {otext}.',
valu=otext)
dt = datetime.datetime.strptime(text, '%Y%m%d')
elif tlen == 10:
if parsed_tz:
raise s_exc.BadTypeValu(mesg=f'Not enough information to parse timezone properly for {otext}.',
valu=otext)
dt = datetime.datetime.strptime(text, '%Y%m%d%H')
elif tlen == 12:
dt = datetime.datetime.strptime(text, '%Y%m%d%H%M')
elif tlen == 14:
dt = datetime.datetime.strptime(text, '%Y%m%d%H%M%S')
elif 15 <= tlen <= 20:
dt = datetime.datetime.strptime(text, '%Y%m%d%H%M%S%f')
else:
raise s_exc.BadTypeValu(valu=otext, name='time',
mesg=f'Unknown time format for {otext}')
except ValueError as e:
raise s_exc.BadTypeValu(mesg=f'Error parsing time "{otext}"; {str(e)}', valu=otext)
return dt, base, tlen
[docs]
def parse(text, base=None, chop=False):
'''
Parse a time string into an epoch millis value.
Args:
text (str): Time string to parse
base (int or None): Milliseconds to offset the time from
chop (bool): Whether to chop the digit-only string to 17 chars
Returns:
int: Epoch milliseconds
'''
dtraw, base, tlen = _rawparse(text, base=base, chop=chop)
return int((dtraw - EPOCH).total_seconds() * 1000 + base)
[docs]
def wildrange(text):
'''
Parse an interval from a wild card time stamp: 2021/10/31*
'''
dttick, base, tlen = _rawparse(text)
if tlen not in (4, 6, 8, 10, 12, 14):
mesg = f'Time wild card position not supported for {text}'
raise s_exc.BadTypeValu(mesg=mesg, valu=text)
if tlen == 4:
dttock = dttick + relativedelta(years=1)
elif tlen == 6:
dttock = dttick + relativedelta(months=1)
elif tlen == 8:
dttock = dttick + relativedelta(days=1)
elif tlen == 10:
dttock = dttick + relativedelta(hours=1)
elif tlen == 12:
dttock = dttick + relativedelta(minutes=1)
else: # tlen = 14
dttock = dttick + relativedelta(seconds=1)
tick = int((dttick - EPOCH).total_seconds() * 1000 + base)
tock = int((dttock - EPOCH).total_seconds() * 1000 + base)
return (tick, tock)
[docs]
def parsetz(text):
'''
Parse timezone from time string, with UTC as the default.
Args:
text (str): Time string
Returns:
tuple: A tuple of text with tz chars removed and base milliseconds to offset time.
'''
match = tz_re.search(text)
if match is None:
return text, 0
tzrel = match['tzrel']
if tzrel is not None:
base = onehour * int(match['tzhr']) + onemin * int(match['tzmin'])
if tzrel == '+':
base *= -1
if abs(base) >= oneday:
raise s_exc.BadTypeValu(mesg=f'Timezone offset must be between +/- 24 hours for {text}',
valu=text, name='time')
return text.replace(match['tzstr'], '', 1), base
offset, _ = s_l_timezones.getTzOffset(match['tzname'])
if offset is None: # pragma: no cover
raise s_exc.BadTypeValu(mesg=f'Unknown timezone for {text}', valu=text, name='time') from None
base = offset * -1
return text.replace(match['tzstr'], '', 1), base
[docs]
def repr(tick, pack=False):
'''
Return a date string for an epoch-millis timestamp.
Args:
tick (int): The timestamp in milliseconds since the epoch.
Returns:
(str): A date time string
'''
if tick == 0x7fffffffffffffff:
return '?'
dt = EPOCH + datetime.timedelta(milliseconds=tick)
millis = dt.microsecond / 1000
if pack:
return '%d%.2d%.2d%.2d%.2d%.2d%.3d' % (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, millis)
return '%d/%.2d/%.2d %.2d:%.2d:%.2d.%.3d' % (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, millis)
[docs]
def day(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).day
[docs]
def year(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).year
[docs]
def month(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).month
[docs]
def hour(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).hour
[docs]
def minute(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).minute
[docs]
def second(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).second
[docs]
def dayofmonth(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).day - 1
[docs]
def dayofweek(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).weekday()
[docs]
def dayofyear(tick):
return (EPOCH + datetime.timedelta(milliseconds=tick)).timetuple().tm_yday - 1
[docs]
def ival(*times):
times = [t for t in times if t is not None]
minv = min(times)
maxv = max(times)
if minv == maxv:
maxv += 1
return (minv, maxv)
# TODO: use synapse.lib.syntax once it gets cleaned up
def _noms(text, offs, cset):
begin = offs
while len(text) > offs and text[offs] in cset:
offs += 1
return text[begin:offs], offs
[docs]
def delta(text):
'''
Parse a simple time delta string and return the delta.
'''
otext = text
text = text.strip().lower()
_, offs = _noms(text, 0, ' \t\r\n')
sign = '+'
if text and text[0] in ('+', '-'):
sign = text[0]
offs += 1
_, offs = _noms(text, offs, ' \t\r\n')
sizetext, offs = _noms(text, offs, '0123456789')
_, offs = _noms(text, offs, ' \t\r\n')
unittext = text[offs:]
size = int(sizetext, 0)
if sign == '-':
size = -size
base = timeunits.get(unittext)
if base is None:
mesg = f'unknown time delta units: {unittext} for {otext}'
raise s_exc.BadTypeValu(name='time', valu=otext, mesg=mesg)
return size * base
[docs]
def toUTC(tick, fromzone):
try:
tz = pytz.timezone(fromzone)
except pytz.exceptions.UnknownTimeZoneError as e:
mesg = f'Unknown timezone: {fromzone}'
raise s_exc.BadArg(mesg=mesg) from e
base = datetime.datetime(1970, 1, 1, tzinfo=tz) + datetime.timedelta(milliseconds=tick)
return int(base.astimezone(pytz.UTC).timestamp() * 1000)