import sys
import asyncio
import decimal
import logging
import binascii
import collections
import regex
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.lib.chop as s_chop
import synapse.lib.node as s_node
import synapse.lib.time as s_time
import synapse.lib.cache as s_cache
import synapse.lib.layer as s_layer
import synapse.lib.config as s_config
import synapse.lib.msgpack as s_msgpack
import synapse.lib.grammar as s_grammar
import synapse.lib.stormtypes as s_stormtypes
logger = logging.getLogger(__name__)
[docs]
class Type:
_opt_defs = ()
stortype: int = None # type: ignore
# a fast-access way to determine if the type is an array
# ( due to hot-loop needs in the storm runtime )
isarray = False
def __init__(self, modl, name, info, opts):
'''
Construct a new Type object.
Args:
modl (synpase.datamodel.DataModel): The data model instance.
name (str): The name of the type.
info (dict): The type info (docs etc).
opts (dict): Options that are specific to the type.
'''
# these fields may be referenced by callers
self.modl = modl
self.name = name
self.info = info
self.form = None # this will reference a Form() if the type is a form
self.subof = None # This references the name that a type was extended from.
self.info.setdefault('bases', ('base',))
self.opts = dict(self._opt_defs)
self.opts.update(opts)
self._type_norms = {} # python type to norm function map str: _norm_str
self._cmpr_ctors = {} # cmpr string to filter function constructor map
self._cmpr_ctor_lift = {} # if set, create a cmpr which is passed along with indx ops
self.setCmprCtor('=', self._ctorCmprEq)
self.setCmprCtor('!=', self._ctorCmprNe)
self.setCmprCtor('~=', self._ctorCmprRe)
self.setCmprCtor('^=', self._ctorCmprPref)
self.setCmprCtor('in=', self._ctorCmprIn)
self.setCmprCtor('range=', self._ctorCmprRange)
self.setNormFunc(s_node.Node, self._normStormNode)
self.storlifts = {
'=': self._storLiftNorm,
'~=': self._storLiftRegx,
'?=': self._storLiftSafe,
'in=': self._storLiftIn,
'range=': self._storLiftRange,
}
self.locked = False
self.deprecated = bool(self.info.get('deprecated', False))
self.postTypeInit()
normopts = dict(self.opts)
for optn, valu in normopts.items():
if isinstance(valu, float):
normopts[optn] = str(valu)
ctor = '.'.join([self.__class__.__module__, self.__class__.__qualname__])
self.typehash = sys.intern(s_common.guid((ctor, s_common.flatten(normopts))))
def _storLiftSafe(self, cmpr, valu):
try:
return self.storlifts['=']('=', valu)
except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only
raise
except Exception:
return ()
def _storLiftIn(self, cmpr, valu):
retn = []
for realvalu in valu:
retn.extend(self.getStorCmprs('=', realvalu))
return retn
def _storLiftNorm(self, cmpr, valu):
# NOTE: this may also be used for any other supported
# lift operation that requires a simple norm(valu)
norm, info = self.norm(valu)
return ((cmpr, norm, self.stortype),)
def _storLiftRange(self, cmpr, valu):
minv, minfo = self.norm(valu[0])
maxv, maxfo = self.norm(valu[1])
return ((cmpr, (minv, maxv), self.stortype),)
def _storLiftRegx(self, cmpr, valu):
return ((cmpr, valu, self.stortype),)
[docs]
def getStorCmprs(self, cmpr, valu):
func = self.storlifts.get(cmpr)
if func is None:
mesg = f'Type ({self.name}) has no cmpr: "{cmpr}".'
raise s_exc.NoSuchCmpr(mesg=mesg)
return func(cmpr, valu)
[docs]
def getStorNode(self, form):
ndef = (form.name, form.type.norm(self.name)[0])
buid = s_common.buid(ndef)
ctor = '.'.join([self.__class__.__module__, self.__class__.__qualname__])
props = {
'doc': self.info.get('doc'),
'ctor': ctor,
}
opts = {k: v for k, v in self.opts.items()}
if opts:
props['opts'] = opts
if self.subof is not None:
props['subof'] = self.subof
pnorms = {}
for prop, valu in props.items():
formprop = form.props.get(prop)
if formprop is not None and valu is not None:
pnorms[prop] = formprop.type.norm(valu)[0]
return (buid, {
'ndef': ndef,
'props': pnorms,
})
[docs]
def getCompOffs(self, name):
'''
If this type is a compound, return the field offset for the given
property name or None.
'''
return None
def _normStormNode(self, node):
return self.norm(node.ndef[1])
[docs]
def pack(self):
return {
'info': dict(self.info),
'opts': dict(self.opts),
'stortype': self.stortype,
}
[docs]
def getTypeDef(self):
basename = self.info['bases'][-1]
info = self.info.copy()
info['stortype'] = self.stortype
return (self.name, (basename, self.opts), info)
[docs]
def getTypeVals(self, valu):
yield valu
[docs]
def setCmprCtor(self, name, func):
'''
Set a comparator ctor for a given named comparison operation.
Args:
name (str): Name of the comparison operation.
func: Function which returns a comparator.
Notes:
Comparator ctors should expect to get the right-hand-side of the
comparison as their argument, and the returned function should
expect to get the left hand side of the comparison and return a
boolean from there.
'''
self._cmpr_ctors[name] = func
[docs]
def getCmprCtor(self, name):
return self._cmpr_ctors.get(name)
[docs]
def setLiftHintCmprCtor(self, name, func):
self._cmpr_ctor_lift[name] = func
[docs]
def getLiftHintCmprCtor(self, name):
return self._cmpr_ctor_lift.get(name)
[docs]
def getLiftHintCmpr(self, valu, cmpr):
ctor = self.getLiftHintCmprCtor(cmpr)
if ctor:
return ctor(valu)
return None
[docs]
def cmpr(self, val1, name, val2):
'''
Compare the two values using the given type specific comparator.
'''
ctor = self.getCmprCtor(name)
if ctor is None:
raise s_exc.NoSuchCmpr(cmpr=name, name=self.name)
norm1 = self.norm(val1)[0]
if name != '~=':
# Don't norm regex patterns
val2 = self.norm(val2)[0]
return ctor(val2)(norm1)
def _ctorCmprEq(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return norm == valu
return cmpr
def _ctorCmprNe(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return norm != valu
return cmpr
def _ctorCmprPref(self, valu):
text = str(valu)
def cmpr(valu):
vtxt = self.repr(valu)
return vtxt.startswith(text)
return cmpr
def _ctorCmprRe(self, text):
regx = regex.compile(text, flags=regex.I)
def cmpr(valu):
vtxt = self.repr(valu)
return regx.search(vtxt) is not None
return cmpr
def _ctorCmprIn(self, vals):
norms = [self.norm(v)[0] for v in vals]
def cmpr(valu):
return valu in norms
return cmpr
def _ctorCmprRange(self, vals):
if not isinstance(vals, (list, tuple)):
raise s_exc.BadCmprValu(name=self.name, valu=vals, cmpr='range=')
if len(vals) != 2:
raise s_exc.BadCmprValu(name=self.name, valu=vals, cmpr='range=')
minv = self.norm(vals[0])[0]
maxv = self.norm(vals[1])[0]
def cmpr(valu):
return minv <= valu <= maxv
return cmpr
[docs]
def setNormFunc(self, typo, func):
'''
Register a normalizer function for a given python type.
Args:
typo (type): A python type/class to normalize.
func (function): A callback which normalizes a python value.
'''
self._type_norms[typo] = func
[docs]
def postTypeInit(self):
pass
[docs]
def norm(self, valu):
'''
Normalize the value for a given type.
Args:
valu (obj): The value to normalize.
Returns:
((obj,dict)): The normalized valu, info tuple.
Notes:
The info dictionary uses the following key conventions:
subs (dict): The normalized sub-fields as name: valu entries.
'''
func = self._type_norms.get(type(valu))
if func is None:
raise s_exc.BadTypeValu(name=self.name, mesg='no norm for type: %r.' % (type(valu),))
return func(valu)
[docs]
def repr(self, norm):
'''
Return a printable representation for the value.
This may return a string or a tuple of values for display purposes.
'''
return str(norm)
[docs]
def merge(self, oldv, newv):
'''
Allow types to "merge" data from two sources based on value precedence.
Args:
valu (object): The current value.
newv (object): The updated value.
Returns:
(object): The merged value.
'''
return newv
[docs]
def extend(self, name, opts, info):
'''
Extend this type to construct a sub-type.
Args:
name (str): The name of the new sub-type.
opts (dict): The type options for the sub-type.
info (dict): The type info for the sub-type.
Returns:
(synapse.types.Type): A new sub-type instance.
'''
tifo = self.info.copy()
tifo.update(info)
bases = self.info.get('bases') + (self.name,)
tifo['bases'] = bases
topt = self.opts.copy()
topt.update(opts)
tobj = self.__class__(self.modl, name, tifo, topt)
tobj.subof = self.name
return tobj
[docs]
def clone(self, opts):
'''
Create a new instance of this type with the specified options.
Args:
opts (dict): The type specific options for the new instance.
'''
topt = self.opts.copy()
topt.update(opts)
return self.__class__(self.modl, self.name, self.info, topt)
def __eq__(self, othr):
if self.name != othr.name:
return False
if self.opts != othr.opts:
return False
return True
[docs]
class Bool(Type):
stortype = s_layer.STOR_TYPE_U8
[docs]
def postTypeInit(self):
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(int, self._normPyInt)
self.setNormFunc(bool, self._normPyInt)
self.setNormFunc(decimal.Decimal, self._normPyInt)
self.setNormFunc(s_stormtypes.Number, self._normNumber)
def _normPyStr(self, valu):
ival = s_common.intify(valu)
if ival is not None:
return int(bool(ival)), {}
sval = valu.lower().strip()
if sval in ('true', 't', 'y', 'yes', 'on'):
return 1, {}
if sval in ('false', 'f', 'n', 'no', 'off'):
return 0, {}
raise s_exc.BadTypeValu(name=self.name, valu=valu,
mesg='Failed to norm bool')
def _normPyInt(self, valu):
return int(bool(valu)), {}
def _normNumber(self, valu):
return int(bool(valu.valu)), {}
[docs]
def repr(self, valu):
return repr(bool(valu)).lower()
[docs]
class Array(Type):
isarray = True
[docs]
def postTypeInit(self):
self.isuniq = self.opts.get('uniq', False)
self.issorted = self.opts.get('sorted', False)
self.splitstr = self.opts.get('split', None)
typename = self.opts.get('type')
if typename is None:
mesg = 'Array type requires type= option.'
raise s_exc.BadTypeDef(mesg=mesg)
typeopts = self.opts.get('typeopts', {})
basetype = self.modl.type(typename)
if basetype is None:
mesg = f'Array type ({self.name}) based on unknown type: {typename}.'
raise s_exc.BadTypeDef(mesg=mesg)
self.arraytype = basetype.clone(typeopts)
if isinstance(self.arraytype, Array):
mesg = 'Array type of array values is not (yet) supported.'
raise s_exc.BadTypeDef(mesg)
if self.arraytype.deprecated:
if self.info.get('custom'):
mesg = f'The Array type {self.name} is based on a deprecated type {self.arraytype.name} type which ' \
f'which will be removed in 3.0.0'
logger.warning(mesg)
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(list, self._normPyTuple)
self.setNormFunc(tuple, self._normPyTuple)
self.stortype = s_layer.STOR_FLAG_ARRAY | self.arraytype.stortype
def _normPyStr(self, text):
if self.splitstr is None:
mesg = f'{self.name} type has no split-char defined.'
raise s_exc.BadTypeValu(name=self.name, mesg=mesg)
parts = [p.strip() for p in text.split(self.splitstr)]
return self._normPyTuple(parts)
def _normPyTuple(self, valu):
adds = []
norms = []
form = self.modl.form(self.arraytype.name)
for item in valu:
norm, info = self.arraytype.norm(item)
adds.extend(info.get('adds', ()))
if form is not None:
adds.append((form.name, norm, info))
norms.append(norm)
if self.isuniq:
uniqs = []
uniqhas = set()
for n in norms:
if n in uniqhas:
continue
uniqhas.add(n)
uniqs.append(n)
norms = tuple(uniqs)
if self.issorted:
norms = tuple(sorted(norms))
return tuple(norms), {'adds': adds}
[docs]
def repr(self, valu):
rval = [self.arraytype.repr(v) for v in valu]
if self.splitstr:
rval = self.splitstr.join(rval)
return rval
[docs]
class Comp(Type):
stortype = s_layer.STOR_TYPE_MSGP
[docs]
def getCompOffs(self, name):
return self.fieldoffs.get(name)
[docs]
def postTypeInit(self):
self.setNormFunc(list, self._normPyTuple)
self.setNormFunc(tuple, self._normPyTuple)
self.sepr = self.opts.get('sepr')
if self.sepr is not None:
self.setNormFunc(str, self._normPyStr)
fields = self.opts.get('fields', ())
# calc and save field offsets...
self.fieldoffs = {n: i for (i, (n, t)) in enumerate(fields)}
self.tcache = FieldHelper(self.modl, self.name, fields)
def _normPyTuple(self, valu):
fields = self.opts.get('fields')
if len(fields) != len(valu):
raise s_exc.BadTypeValu(name=self.name, fields=fields, numitems=len(valu),
mesg=f'invalid number of fields given for norming: {s_common.trimText(repr(valu))}')
subs = {}
adds = []
norms = []
for i, (name, _) in enumerate(fields):
_type = self.tcache[name]
norm, info = _type.norm(valu[i])
subs[name] = norm
norms.append(norm)
for k, v in info.get('subs', {}).items():
subs[f'{name}:{k}'] = v
typeform = self.modl.form(_type.name)
if typeform is not None:
adds.append((typeform.name, norm, info))
adds.extend(info.get('adds', ()))
norm = tuple(norms)
return norm, {'subs': subs, 'adds': adds}
def _normPyStr(self, text):
return self._normPyTuple(text.split(self.sepr))
[docs]
def repr(self, valu):
vals = []
fields = self.opts.get('fields')
for valu, (name, _) in zip(valu, fields):
rval = self.tcache[name].repr(valu)
vals.append(rval)
if self.sepr is not None:
return self.sepr.join(vals)
return tuple(vals)
[docs]
class FieldHelper(collections.defaultdict):
'''
Helper for Comp types. Performs Type lookup/creation upon first use.
'''
def __init__(self, modl, tname, fields):
collections.defaultdict.__init__(self)
self.modl = modl
self.tname = tname
self.fields = {name: tname for name, tname in fields}
def __missing__(self, key):
val = self.fields.get(key)
if not val:
raise s_exc.BadTypeDef(valu=key, mesg='unconfigured field requested')
if isinstance(val, str):
_type = self.modl.type(val)
if not _type:
raise s_exc.BadTypeDef(valu=val, mesg='type is not present in datamodel')
else:
# val is a type, opts pair
tname, opts = val
basetype = self.modl.type(tname)
if not basetype:
raise s_exc.BadTypeDef(valu=val, mesg='type is not present in datamodel')
_type = basetype.clone(opts)
if _type.deprecated:
mesg = f'The type {self.tname} field {key} uses a deprecated ' \
f'type {_type.name} which will removed in 3.0.0'
logger.warning(mesg)
self.setdefault(key, _type)
return _type
[docs]
class Guid(Type):
stortype = s_layer.STOR_TYPE_GUID
[docs]
def postTypeInit(self):
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(list, self._normPyList)
self.setNormFunc(tuple, self._normPyList)
self.storlifts.update({
'^=': self._storLiftPref,
})
def _storLiftPref(self, cmpr, valu):
try:
byts = s_common.uhex(valu)
except binascii.Error:
mesg = f'Invalid GUID prefix ({valu}). Must be even number of hex chars.'
raise s_exc.BadTypeValu(mesg=mesg)
return (
('^=', byts, self.stortype),
)
def _normPyList(self, valu):
return s_common.guid(valu), {}
def _normPyStr(self, valu):
if valu == '*':
valu = s_common.guid()
return valu, {}
valu = valu.lower().replace('-', '')
if not s_common.isguid(valu):
raise s_exc.BadTypeValu(name=self.name, valu=valu,
mesg='valu is not a guid.')
return valu, {}
[docs]
class Hex(Type):
stortype = s_layer.STOR_TYPE_UTF8
_opt_defs = (
('size', 0), # type: ignore
('zeropad', 0),
)
[docs]
def postTypeInit(self):
self._size = self.opts.get('size')
# This is for backward compat with v2.142.x where zeropad was a bool
self._zeropad = self.opts.get('zeropad')
if isinstance(self._zeropad, bool):
if self._zeropad:
self._zeropad = self._size
else:
self._zeropad = 0
if self._size < 0:
# zero means no width check
raise s_exc.BadConfValu(name='size', valu=self._size,
mesg='Size must be >= 0')
if self._size % 2 != 0:
raise s_exc.BadConfValu(name='size', valu=self._size,
mesg='Size must be a multiple of 2')
if self._zeropad < 0:
raise s_exc.BadConfValu(name='zeropad', valu=self._zeropad,
mesg='Zeropad must be >= 0')
if self._zeropad % 2 != 0:
raise s_exc.BadConfValu(name='zeropad', valu=self._zeropad,
mesg='Zeropad must be a multiple of 2')
if self._size:
self._zeropad = min(self._zeropad, self._size)
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(bytes, self._normPyBytes)
self.storlifts.update({
'=': self._storLiftEq,
'^=': self._storLiftPref,
})
def _preNormHex(self, text):
text = text.strip().lower()
if text.startswith('0x'):
text = text[2:]
return text.replace(' ', '').replace(':', '')
def _storLiftEq(self, cmpr, valu):
if isinstance(valu, str):
valu = self._preNormHex(valu)
if valu.endswith('*'):
return (
('^=', valu[:-1], self.stortype),
)
return self._storLiftNorm(cmpr, valu)
def _storLiftPref(self, cmpr, valu):
valu = self._preNormHex(valu)
return (
('^=', valu, self.stortype),
)
def _normPyStr(self, valu):
valu = valu.strip().lower()
if valu.startswith('0x'):
valu = valu[2:]
valu = valu.replace(' ', '').replace(':', '')
if not valu:
raise s_exc.BadTypeValu(valu=valu, name='hex',
mesg='No string left after stripping')
if self._zeropad and len(valu) < self._zeropad:
padlen = self._zeropad - len(valu)
valu = ('0' * padlen) + valu
try:
# checks for valid hex width and does character
# checking in C without using regex
s_common.uhex(valu)
except (binascii.Error, ValueError) as e:
raise s_exc.BadTypeValu(valu=valu, name='hex', mesg=str(e)) from None
if self._size and len(valu) != self._size:
raise s_exc.BadTypeValu(valu=valu, reqwidth=self._size, name=self.name,
mesg='invalid width')
return valu, {}
def _normPyBytes(self, valu):
return self._normPyStr(s_common.ehex(valu))
intstors = {
(1, True): s_layer.STOR_TYPE_I8,
(2, True): s_layer.STOR_TYPE_I16,
(4, True): s_layer.STOR_TYPE_I32,
(8, True): s_layer.STOR_TYPE_I64,
(16, True): s_layer.STOR_TYPE_I128,
(1, False): s_layer.STOR_TYPE_U8,
(2, False): s_layer.STOR_TYPE_U16,
(4, False): s_layer.STOR_TYPE_U32,
(8, False): s_layer.STOR_TYPE_U64,
(16, False): s_layer.STOR_TYPE_U128,
}
hugemax = 730750818665451459101842
[docs]
class HugeNum(Type):
stortype = s_layer.STOR_TYPE_HUGENUM
_opt_defs = (
('units', None), # type: ignore
('modulo', None), # type: ignore
)
def __init__(self, modl, name, info, opts):
Type.__init__(self, modl, name, info, opts)
self.setCmprCtor('>', self._ctorCmprGt)
self.setCmprCtor('<', self._ctorCmprLt)
self.setCmprCtor('>=', self._ctorCmprGe)
self.setCmprCtor('<=', self._ctorCmprLe)
self.storlifts.update({
'<': self._storLiftNorm,
'>': self._storLiftNorm,
'<=': self._storLiftNorm,
'>=': self._storLiftNorm,
'range=': self._storLiftRange,
})
self.modulo = None
self.units = {}
units = self.opts.get('units')
if units is not None:
for name, mult in units.items():
self.units[name] = s_common.hugenum(mult)
modulo = self.opts.get('modulo')
if modulo is not None:
self.modulo = s_common.hugenum(modulo)
def _normHugeText(self, rawtext):
text = rawtext.lower().strip()
text = text.replace(',', '').replace(' ', '')
try:
valu, off = s_grammar.chop_float(text, 0)
except Exception:
mesg = f'Value does not start with a number: "{rawtext}"'
raise s_exc.BadTypeValu(mesg=mesg)
huge = s_common.hugenum(valu)
unit, off = s_grammar.nom(text, off, s_grammar.unitset)
if unit:
mult = self.units.get(unit)
if mult is None:
mesg = f'Unknown units for value: "{rawtext}"'
raise s_exc.BadTypeValu(mesg=mesg)
huge = s_common.hugemul(huge, mult)
return huge
[docs]
def norm(self, valu):
if valu is None:
mesg = 'Hugenum type may not be null.'
raise s_exc.BadTypeValu(mesg=mesg)
try:
if isinstance(valu, str):
huge = self._normHugeText(valu)
else:
huge = s_common.hugenum(valu)
# behave modulo like int/float
if self.modulo is not None:
_, huge = s_common.hugemod(huge, self.modulo)
if huge < 0:
huge = s_common.hugeadd(huge, self.modulo)
huge = s_common.hugeround(huge)
except decimal.DecimalException as e:
mesg = f'Invalid hugenum: {e}'
raise s_exc.BadTypeValu(name=self.name, valu=valu, mesg=mesg) from None
if huge > hugemax:
mesg = f'Value ({valu}) is too large for hugenum.'
raise s_exc.BadTypeValu(mesg=mesg)
if abs(huge) > hugemax:
mesg = f'Value ({valu}) is too small for hugenum.'
raise s_exc.BadTypeValu(mesg=mesg)
huge = s_common.hugeround(huge).normalize(s_common.hugectx)
return '{:f}'.format(huge), {}
def _ctorCmprEq(self, text):
base = s_common.hugenum(text)
def cmpr(valu):
valu = s_common.hugenum(valu)
return valu == base
return cmpr
def _ctorCmprGt(self, text):
base = s_common.hugenum(text)
def cmpr(valu):
valu = s_common.hugenum(valu)
return valu > base
return cmpr
def _ctorCmprLt(self, text):
base = s_common.hugenum(text)
def cmpr(valu):
valu = s_common.hugenum(valu)
return valu < base
return cmpr
def _ctorCmprGe(self, text):
base = s_common.hugenum(text)
def cmpr(valu):
valu = s_common.hugenum(valu)
return valu >= base
return cmpr
def _ctorCmprLe(self, text):
base = s_common.hugenum(text)
def cmpr(valu):
valu = s_common.hugenum(valu)
return valu <= base
return cmpr
def _storLiftRange(self, cmpr, valu):
minv, minfo = self.norm(valu[0])
maxv, maxfo = self.norm(valu[1])
return ((cmpr, (minv, maxv), self.stortype),)
[docs]
class IntBase(Type):
def __init__(self, modl, name, info, opts):
Type.__init__(self, modl, name, info, opts)
self.setCmprCtor('>=', self._ctorCmprGe)
self.setCmprCtor('<=', self._ctorCmprLe)
self.setCmprCtor('>', self._ctorCmprGt)
self.setCmprCtor('<', self._ctorCmprLt)
self.storlifts.update({
'<': self._storLiftNorm,
'>': self._storLiftNorm,
'<=': self._storLiftNorm,
'>=': self._storLiftNorm,
'range=': self._storLiftRange,
})
self.setNormFunc(decimal.Decimal, self._normPyDecimal)
self.setNormFunc(s_stormtypes.Number, self._normNumber)
def _storLiftRange(self, cmpr, valu):
minv, minfo = self.norm(valu[0])
maxv, maxfo = self.norm(valu[1])
return ((cmpr, (minv, maxv), self.stortype),)
def _ctorCmprGe(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return valu >= norm
return cmpr
def _ctorCmprLe(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return valu <= norm
return cmpr
def _ctorCmprGt(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return valu > norm
return cmpr
def _ctorCmprLt(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return valu < norm
return cmpr
def _normPyDecimal(self, valu):
return self._normPyInt(int(valu))
def _normNumber(self, valu):
return self._normPyInt(int(valu.valu))
[docs]
class Int(IntBase):
_opt_defs = (
('size', 8), # type: ignore # Set the storage size of the integer type in bytes.
('signed', True),
('enums:strict', True),
# Note: currently unused
('fmt', '%d'), # Set to an integer compatible format string to control repr.
('min', None), # Set to a value to enforce minimum value for the type.
('max', None), # Set to a value to enforce maximum value for the type.
('ismin', False), # Set to True to enable ismin behavior on value merge.
('ismax', False), # Set to True to enable ismax behavior on value merge.
)
[docs]
def postTypeInit(self):
self.size = self.opts.get('size')
self.signed = self.opts.get('signed')
self.stortype = intstors.get((self.size, self.signed))
if self.stortype is None:
mesg = f'Invalid integer size ({self.size})'
raise s_exc.BadTypeDef(mesg=mesg)
self.enumnorm = {}
self.enumrepr = {}
self.enumstrict = self.opts.get('enums:strict')
enums = self.opts.get('enums')
if enums is not None:
self.enumrepr.update(dict(enums))
self.enumnorm.update({(n.lower(), v) for (v, n) in enums})
if len(enums) != len(self.enumrepr):
mesg = 'Number of enums does not match the number of string reprs.'
raise s_exc.BadTypeDef(mesg=mesg,
name=self.name)
if len(enums) != len(self.enumnorm):
mesg = 'Number of enums does not match the number of string norms.'
raise s_exc.BadTypeDef(mesg=mesg,
name=self.name)
minval = self.opts.get('min')
maxval = self.opts.get('max')
minmin = -2 ** ((self.size * 8) - 1)
if minval is None:
minval = minmin
maxmax = 2 ** ((self.size * 8) - 1) - 1
if maxval is None:
maxval = maxmax
if minval < minmin or maxval > maxmax or maxval < minval:
raise s_exc.BadTypeDef(self.opts, name=self.name)
if not self.signed:
self._indx_offset = 0
self.minval = 0
self.maxval = min(2 * maxval, maxval)
else:
self._indx_offset = maxmax + 1
self.minval = max(minmin, minval)
self.maxval = min(maxmax, maxval)
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(int, self._normPyInt)
self.setNormFunc(bool, self._normPyBool)
self.setNormFunc(float, self._normPyFloat)
[docs]
def merge(self, oldv, newv):
if self.opts.get('ismin'):
return min(oldv, newv)
if self.opts.get('ismax'):
return max(oldv, newv)
return newv
def _normPyStr(self, valu):
if self.enumnorm:
ival = self.enumnorm.get(valu.lower())
if ival is not None:
return self._normPyInt(ival)
try:
valu = int(valu, 0)
except ValueError as e:
raise s_exc.BadTypeValu(name=self.name, valu=valu,
mesg=str(e)) from None
return self._normPyInt(valu)
def _normPyBool(self, valu):
return self._normPyInt(int(valu))
def _normPyInt(self, valu):
if self.minval is not None and valu < self.minval:
mesg = f'value is below min={self.minval}'
raise s_exc.BadTypeValu(valu=repr(valu), name=self.name, mesg=mesg)
if self.maxval is not None and valu > self.maxval:
mesg = f'value is above max={self.maxval}'
raise s_exc.BadTypeValu(valu=repr(valu), name=self.name, mesg=mesg)
if self.enumrepr and self.enumstrict and valu not in self.enumrepr:
mesg = 'Value is not a valid enum value.'
raise s_exc.BadTypeValu(valu=valu, name=self.name, mesg=mesg)
return valu, {}
def _normPyFloat(self, valu):
return self._normPyInt(int(valu))
[docs]
def repr(self, norm):
text = self.enumrepr.get(norm)
if text is not None:
return text
return str(norm)
[docs]
class Float(Type):
_opt_defs = (
# Note: currently unused
('fmt', '%f'), # type:ignore # Set to an float compatible format string to control repr.
('min', None), # Set to a value to enforce minimum value for the type.
('minisvalid', True), # Only valid if min is set. True if min is itself a valid value (i.e. closed interval)
('max', None), # Set to a value to enforce maximum value for the type.
('maxisvalid', True), # Only valid if max is set. True if max is itself a valid value (i.e. closed interval)
)
stortype = s_layer.STOR_TYPE_FLOAT64
def __init__(self, modl, name, info, opts):
Type.__init__(self, modl, name, info, opts)
self.setCmprCtor('>=', self._ctorCmprGe)
self.setCmprCtor('<=', self._ctorCmprLe)
self.setCmprCtor('>', self._ctorCmprGt)
self.setCmprCtor('<', self._ctorCmprLt)
self.storlifts.update({
'<': self._storLiftNorm,
'>': self._storLiftNorm,
'<=': self._storLiftNorm,
'>=': self._storLiftNorm,
'range=': self._storLiftRange,
})
def _storLiftRange(self, cmpr, valu):
minv, minfo = self.norm(valu[0])
maxv, maxfo = self.norm(valu[1])
return ((cmpr, (minv, maxv), self.stortype),)
def _ctorCmprGe(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return valu >= norm
return cmpr
def _ctorCmprLe(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return valu <= norm
return cmpr
def _ctorCmprGt(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return valu > norm
return cmpr
def _ctorCmprLt(self, text):
norm, info = self.norm(text)
def cmpr(valu):
return valu < norm
return cmpr
[docs]
def postTypeInit(self):
self.enumnorm = {}
self.enumrepr = {}
self.minval = self.opts.get('min')
self.maxval = self.opts.get('max')
if self.minval is not None:
isopen = self.opts.get('minisvalid')
self.mincmp = (lambda x, y: x >= y) if isopen else (lambda x, y: x > y)
if self.maxval is not None:
isopen = self.opts.get('maxisvalid')
self.maxcmp = (lambda x, y: x <= y) if isopen else (lambda x, y: x < y)
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(int, self._normPyInt)
self.setNormFunc(float, self._normPyFloat)
self.setNormFunc(decimal.Decimal, self._normPyInt)
self.setNormFunc(s_stormtypes.Number, self._normNumber)
def _normPyStr(self, valu):
try:
valu = float(valu)
except ValueError as e:
raise s_exc.BadTypeValu(name=self.name, valu=valu,
mesg=str(e)) from None
return self._normPyFloat(valu)
def _normPyInt(self, valu):
valu = float(valu)
return self._normPyFloat(valu)
def _normNumber(self, valu):
return self._normPyFloat(float(valu.valu))
def _normPyFloat(self, valu):
if self.minval is not None and not self.mincmp(valu, self.minval):
mesg = f'value is below min={self.minval}'
raise s_exc.BadTypeValu(valu=valu, name=self.name, mesg=mesg)
if self.maxval is not None and not self.maxcmp(valu, self.maxval):
mesg = f'value is above max={self.maxval}'
raise s_exc.BadTypeValu(valu=valu, name=self.name, mesg=mesg)
return valu, {}
[docs]
def repr(self, norm):
return str(norm)
[docs]
class Ival(Type):
'''
An interval, i.e. a range, of times
'''
stortype = s_layer.STOR_TYPE_IVAL
[docs]
def postTypeInit(self):
self.futsize = 0x7fffffffffffffff
self.maxsize = 253402300799999 # 9999/12/31 23:59:59.999
self.timetype = self.modl.type('time')
# Range stuff with ival's don't make sense
# self.indxcmpr.pop('range=', None)
self._cmpr_ctors.pop('range=', None)
self.setCmprCtor('@=', self._ctorCmprAt)
# _ctorCmprAt implements its own custom norm-style resolution
self.setNormFunc(int, self._normPyInt)
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(list, self._normPyIter)
self.setNormFunc(tuple, self._normPyIter)
self.setNormFunc(decimal.Decimal, self._normPyInt)
self.setNormFunc(s_stormtypes.Number, self._normNumber)
self.storlifts.update({
'@=': self._storLiftAt,
})
def _storLiftAt(self, cmpr, valu):
if type(valu) not in (list, tuple):
return self._storLiftNorm(cmpr, valu)
ticktock = self.timetype.getTickTock(valu)
return (
('@=', ticktock, self.stortype),
)
def _ctorCmprAt(self, valu):
if valu is None or valu == (None, None):
def cmpr(item):
return False
return cmpr
if isinstance(valu, (str, int)):
norm = self.norm(valu)[0]
elif isinstance(valu, (list, tuple)):
minv, maxv = self._normByTickTock(valu)[0]
# Use has input the nullset in a comparison operation.
if minv >= maxv:
def cmpr(item):
return False
return cmpr
else:
norm = (minv, maxv)
else:
raise s_exc.NoSuchFunc(name=self.name,
mesg='no norm for @= operator: %r' % (type(valu),))
def cmpr(item):
if item is None:
return False
if item == (None, None):
return False
othr, info = self.norm(item)
if othr[0] >= norm[1]:
return False
if othr[1] <= norm[0]:
return False
return True
return cmpr
def _normPyInt(self, valu):
minv, _ = self.timetype._normPyInt(valu)
maxv, info = self.timetype._normPyInt(minv + 1)
return (minv, maxv), info
def _normNumber(self, valu):
minv, _ = self.timetype._normPyInt(valu.valu)
maxv, info = self.timetype._normPyInt(minv + 1)
return (minv, maxv), info
def _normRelStr(self, valu, relto=None):
valu = valu.strip().lower()
# assumes the relative string starts with a - or +
delt = s_time.delta(valu)
if not relto:
relto = s_common.now()
return self.timetype._normPyInt(delt + relto)[0]
def _normPyStr(self, valu):
valu = valu.strip().lower()
if valu == '?':
raise s_exc.BadTypeValu(name=self.name, valu=valu, mesg='interval requires begin time')
if ',' in valu:
return self._normByTickTock(valu.split(',', 1))
minv, _ = self.timetype.norm(valu)
# Norm is guaranteed to be a valid time value, but norm +1 may not be
maxv, info = self.timetype._normPyInt(minv + 1)
return (minv, maxv), info
def _normPyIter(self, valu):
(minv, maxv), info = self._normByTickTock(valu)
if minv == maxv:
maxv = maxv + 1
# Norm via iter must produce an actual range.
if minv > maxv:
raise s_exc.BadTypeValu(name=self.name, valu=valu,
mesg='Ival range must in (min, max) format')
if maxv > self.futsize:
raise s_exc.BadTypeValu(name=self.name, valu=valu,
mesg='Ival upper range cannot exceed future size marker')
return (minv, maxv), info
def _normByTickTock(self, valu):
if len(valu) != 2:
raise s_exc.BadTypeValu(name=self.name, valu=valu,
mesg='Ival _normPyIter requires 2 items')
tick, tock = self.timetype.getTickTock(valu)
minv, _ = self.timetype._normPyInt(tick)
maxv, _ = self.timetype._normPyInt(tock)
return (minv, maxv), {}
[docs]
def merge(self, oldv, newv):
mint = min(oldv[0], newv[0])
maxt = max(oldv[1], newv[1])
return (mint, maxt)
[docs]
def repr(self, norm):
mint = self.timetype.repr(norm[0])
maxt = self.timetype.repr(norm[1])
return (mint, maxt)
[docs]
class Loc(Type):
stortype = s_layer.STOR_TYPE_LOC
[docs]
def postTypeInit(self):
self.setNormFunc(str, self._normPyStr)
self.setCmprCtor('^=', self._ctorCmprPref)
self.setLiftHintCmprCtor('^=', self._ctorCmprPref)
self.storlifts.update({
'=': self._storLiftEq,
'^=': self._storLiftPref,
})
def _storLiftEq(self, cmpr, valu):
if valu.endswith('.*'):
norm, info = self.norm(valu[:-2])
return (
('^=', norm, self.stortype),
)
norm, info = self.norm(valu)
return (
('=', norm, self.stortype),
)
def _storLiftPref(self, cmpr, valu):
norm, info = self.norm(valu)
return (
('^=', norm, self.stortype),
)
def _normPyStr(self, valu):
valu = valu.lower().strip()
norms = []
for part in valu.split('.'):
part = ' '.join(part.split())
norms.append(part)
norm = '.'.join(norms)
return norm, {}
[docs]
@s_cache.memoizemethod()
def stems(self, valu):
norm, info = self.norm(valu)
parts = norm.split('.')
ret = []
for i in range(len(parts)):
part = '.'.join(parts[:i + 1])
ret.append(part)
return ret
def _ctorCmprPref(self, text):
norm, _ = self.norm(text)
def cmpr(valu):
# Shortcut equality
if valu == norm:
return True
vstems = self.stems(valu)
return norm in vstems
return cmpr
[docs]
def repr(self, norm):
return norm
[docs]
class Ndef(Type):
stortype = s_layer.STOR_TYPE_NDEF
[docs]
def postTypeInit(self):
self.setNormFunc(list, self._normPyTuple)
self.setNormFunc(tuple, self._normPyTuple)
self.formfilter = None
self.forms = self.opts.get('forms')
self.ifaces = self.opts.get('interfaces')
if self.forms or self.ifaces:
if self.forms is not None:
forms = set(self.forms)
if self.ifaces is not None:
ifaces = set(self.ifaces)
def filtfunc(form):
if self.forms is not None and form.name in forms:
return False
if self.ifaces is not None:
for iface in form.ifaces.keys():
if iface in ifaces:
return False
return True
self.formfilter = filtfunc
def _normStormNode(self, valu):
return self._normPyTuple(valu.ndef)
def _normPyTuple(self, valu):
try:
formname, formvalu = valu
except Exception as e:
raise s_exc.BadTypeValu(name=self.name, valu=valu, mesg=str(e)) from None
form = self.modl.form(formname)
if form is None:
raise s_exc.NoSuchForm.init(formname)
if self.formfilter is not None and self.formfilter(form):
mesg = f'Ndef of form {formname} is not allowed as a value for {self.name} with form filter'
if self.forms is not None:
mesg += f' forms={self.forms}'
if self.ifaces is not None:
mesg += f' interfaces={self.ifaces}'
raise s_exc.BadTypeValu(valu=formname, name=self.name, mesg=mesg, forms=self.forms, interfaces=self.ifaces)
formnorm, forminfo = form.type.norm(formvalu)
norm = (form.name, formnorm)
adds = ((form.name, formnorm, forminfo),)
subs = {'form': form.name}
return norm, {'adds': adds, 'subs': subs}
[docs]
def repr(self, norm):
formname, formvalu = norm
form = self.modl.form(formname)
if form is None:
raise s_exc.NoSuchForm.init(formname)
repv = form.type.repr(formvalu)
return (formname, repv)
[docs]
class Edge(Type):
stortype = s_layer.STOR_TYPE_MSGP
[docs]
def getCompOffs(self, name):
return self.fieldoffs.get(name)
[docs]
def postTypeInit(self):
self.deprecated = True
self.fieldoffs = {'n1': 0, 'n2': 1}
self.ndeftype = self.modl.types.get('ndef') # type: Ndef
self.n1forms = None
self.n2forms = None
self.n1forms = self.opts.get('n1:forms', None)
self.n2forms = self.opts.get('n2:forms', None)
self.setNormFunc(list, self._normPyTuple)
self.setNormFunc(tuple, self._normPyTuple)
def _initEdgeBase(self, n1, n2):
subs = {}
n1, info = self.ndeftype.norm(n1)
if self.n1forms is not None:
if n1[0] not in self.n1forms:
raise s_exc.BadTypeValu(valu=n1[0], name=self.name, mesg='Invalid source node for edge type')
subs['n1'] = n1
subs['n1:form'] = n1[0]
n2, info = self.ndeftype.norm(n2)
if self.n2forms is not None:
if n2[0] not in self.n2forms:
raise s_exc.BadTypeValu(valu=n2[0], name=self.name, mesg='Invalid dest node for edge type')
subs['n2'] = n2
subs['n2:form'] = n2[0]
return (n1, n2), {'subs': subs}
def _normPyTuple(self, valu):
if len(valu) != 2:
mesg = 'edge requires (ndef, ndef)'
raise s_exc.BadTypeValu(mesg=mesg, name=self.name, valu=valu)
n1, n2 = valu
return self._initEdgeBase(n1, n2)
[docs]
def repr(self, norm):
n1, n2 = norm
n1repr = self.ndeftype.repr(n1)
n2repr = self.ndeftype.repr(n2)
return (n1repr, n2repr)
[docs]
class TimeEdge(Edge):
stortype = s_layer.STOR_TYPE_MSGP
[docs]
def getCompOffs(self, name):
return self.fieldoffs.get(name)
[docs]
def postTypeInit(self):
Edge.postTypeInit(self)
self.fieldoffs['time'] = 2
def _normPyTuple(self, valu):
if len(valu) != 3:
mesg = f'timeedge requires (ndef, ndef, time), got {valu}'
raise s_exc.BadTypeValu(mesg=mesg, name=self.name, valu=valu)
n1, n2, tick = valu
tick, info = self.modl.types.get('time').norm(tick)
(n1, n2), info = self._initEdgeBase(n1, n2)
info['subs']['time'] = tick
return (n1, n2, tick), info
[docs]
def repr(self, norm):
n1, n2, tick = norm
n1repr = self.ndeftype.repr(n1)
n2repr = self.ndeftype.repr(n2)
trepr = self.modl.type('time').repr(tick)
return (n1repr, n2repr, trepr)
[docs]
class Data(Type):
stortype = s_layer.STOR_TYPE_MSGP
[docs]
def postTypeInit(self):
self.validator = None
schema = self.opts.get('schema')
if schema is not None:
self.validator = s_config.getJsValidator(schema)
[docs]
def norm(self, valu):
try:
s_common.reqjsonsafe(valu)
if self.validator is not None:
self.validator(valu)
except (s_exc.MustBeJsonSafe, s_exc.SchemaViolation) as e:
raise s_exc.BadTypeValu(name=self.name, mesg=f'{e}: {s_common.trimText(repr(valu))}') from None
byts = s_msgpack.en(valu)
return s_msgpack.un(byts), {}
[docs]
class NodeProp(Type):
stortype = s_layer.STOR_TYPE_MSGP
[docs]
def postTypeInit(self):
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(list, self._normPyTuple)
self.setNormFunc(tuple, self._normPyTuple)
def _normPyStr(self, valu):
valu = valu.split('=', 1)
return self._normPyTuple(valu)
def _normPyTuple(self, valu):
if len(valu) != 2:
mesg = f'Must be a 2-tuple: {s_common.trimText(repr(valu))}'
raise s_exc.BadTypeValu(name=self.name, numitems=len(valu), mesg=mesg) from None
propname, propvalu = valu
prop = self.modl.prop(propname)
if prop is None:
mesg = f'No prop {propname}'
raise s_exc.NoSuchProp(mesg=mesg, name=self.name, prop=propname)
propnorm, info = prop.type.norm(propvalu)
return (prop.full, propnorm), {'subs': {'prop': prop.full}}
[docs]
class Range(Type):
stortype = s_layer.STOR_TYPE_MSGP
_opt_defs = (
('type', None), # type: ignore
)
[docs]
def postTypeInit(self):
subtype = self.opts.get('type')
if not(type(subtype) is tuple and len(subtype) == 2):
raise s_exc.BadTypeDef(self.opts, name=self.name)
try:
self.subtype = self.modl.type(subtype[0]).clone(subtype[1])
except Exception:
logger.exception('subtype invalid or unavailable')
raise s_exc.BadTypeDef(self.opts, name=self.name, mesg='subtype invalid or unavailable')
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(tuple, self._normPyTuple)
self.setNormFunc(list, self._normPyTuple)
def _normPyStr(self, valu):
valu = valu.split('-', 1)
return self._normPyTuple(valu)
def _normPyTuple(self, valu):
if len(valu) != 2:
mesg = f'Must be a 2-tuple of type {self.subtype.name}: {s_common.trimText(repr(valu))}'
raise s_exc.BadTypeValu(numitems=len(valu), name=self.name, mesg=mesg)
minv = self.subtype.norm(valu[0])[0]
maxv = self.subtype.norm(valu[1])[0]
if minv > maxv:
raise s_exc.BadTypeValu(valu=valu, name=self.name,
mesg='minval cannot be greater than maxval')
return (minv, maxv), {'subs': {'min': minv, 'max': maxv}}
[docs]
def repr(self, norm):
subx = self.subtype.repr(norm[0])
suby = self.subtype.repr(norm[1])
return (subx, suby)
[docs]
class Str(Type):
stortype = s_layer.STOR_TYPE_UTF8
_opt_defs = (
('enums', None), # type: ignore
('regex', None),
('lower', False),
('strip', False),
('replace', ()),
('onespace', False),
('globsuffix', False),
)
[docs]
def repr(self, norm):
return norm
[docs]
def postTypeInit(self):
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(int, self._normPyInt)
self.setNormFunc(bool, self._normPyBool)
self.setNormFunc(float, self._normPyFloat)
self.setNormFunc(decimal.Decimal, self._normPyInt)
self.setNormFunc(s_stormtypes.Number, self._normNumber)
self.storlifts.update({
'=': self._storLiftEq,
'^=': self._storLiftPref,
'~=': self._storLiftRegx,
'range=': self._storLiftRange,
})
self.regex = None
restr = self.opts.get('regex')
if restr is not None:
self.regex = regex.compile(restr)
self.envals = None
enumstr = self.opts.get('enums')
if enumstr is not None:
self.envals = enumstr.split(',')
def _storLiftEq(self, cmpr, valu):
if self.opts.get('globsuffix') and valu.endswith('*'):
return (
('^=', valu[:-1], self.stortype),
)
return self._storLiftNorm(cmpr, valu)
def _storLiftRange(self, cmpr, valu):
minx = self._normForLift(valu[0])
maxx = self._normForLift(valu[1])
return (
(cmpr, (minx, maxx), self.stortype),
)
def _normForLift(self, valu):
# doesnt have to be normable...
if self.opts.get('lower'):
valu = valu.lower()
for look, repl in self.opts.get('replace', ()):
valu = valu.replace(look, repl)
# Only strip the left side of the string for prefix match
if self.opts.get('strip'):
valu = valu.lstrip()
if self.opts.get('onespace'):
valu = s_chop.onespace(valu)
return valu
def _storLiftPref(self, cmpr, valu):
valu = self._normForLift(valu)
return (('^=', valu, self.stortype),)
def _storLiftRegx(self, cmpr, valu):
return ((cmpr, valu, self.stortype),)
def _normPyBool(self, valu):
return self._normPyStr(str(valu).lower())
def _normPyInt(self, valu):
return self._normPyStr(str(valu))
def _normNumber(self, valu):
return self._normPyStr(str(valu.valu))
def _normPyFloat(self, valu):
deci = s_common.hugectx.create_decimal(str(valu))
return self._normPyStr(format(deci, 'f'))
def _normPyStr(self, valu):
info = {}
norm = str(valu)
if self.opts['lower']:
norm = norm.lower()
for look, repl in self.opts.get('replace', ()):
norm = norm.replace(look, repl)
if self.opts['strip']:
norm = norm.strip()
if self.opts['onespace']:
norm = s_chop.onespace(norm)
if self.envals is not None:
if norm not in self.envals:
raise s_exc.BadTypeValu(valu=valu, name=self.name, enums=self.info.get('enums'),
mesg='Value not in enums')
if self.regex is not None:
match = self.regex.match(norm)
if match is None:
raise s_exc.BadTypeValu(name=self.name, valu=valu, regx=self.regex.pattern,
mesg=f'[{valu}] does not match [{self.regex.pattern}]')
subs = match.groupdict()
if subs:
info['subs'] = subs
return norm, info
taxonre = regex.compile('\\w+')
[docs]
class Taxon(Str):
[docs]
def postTypeInit(self):
Str.postTypeInit(self)
self.setNormFunc(str, self._normPyStr)
def _normForLift(self, valu):
return self.norm(valu)[0]
def _normPyStr(self, valu):
valu = valu.lower().strip()
parts = taxonre.findall(valu)
valu = '_'.join(parts)
if len(valu) == 0:
mesg = 'Each taxon must be non-zero length.'
raise s_exc.BadTypeValu(mesg=mesg)
return valu, {}
[docs]
class Taxonomy(Str):
[docs]
def postTypeInit(self):
Str.postTypeInit(self)
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(list, self._normPyList)
self.setNormFunc(tuple, self._normPyList)
self.taxon = self.modl.type('taxon')
def _ctorCmprPref(self, valu):
norm = self._normForLift(valu)
def cmpr(valu):
return valu.startswith(norm)
return cmpr
def _normForLift(self, valu):
norm = self.norm(valu)[0]
if isinstance(valu, str) and not valu.strip().endswith('.'):
return norm.rstrip('.')
return norm
def _normPyList(self, valu):
toks = [self.taxon.norm(v)[0] for v in valu]
subs = {
'base': toks[-1],
'depth': len(toks) - 1,
}
if len(toks) > 1:
subs['parent'] = '.'.join(toks[:-1]) + '.'
norm = '.'.join(toks) + '.'
return norm, {'subs': subs}
def _normPyStr(self, text):
return self._normPyList(text.strip().strip('.').split('.'))
[docs]
def repr(self, norm):
return norm.rstrip('.')
[docs]
class Tag(Str):
[docs]
def postTypeInit(self):
Str.postTypeInit(self)
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(list, self._normPyList)
self.setNormFunc(tuple, self._normPyList)
self.tagpart = self.modl.type('syn:tag:part')
def _normPyList(self, valu):
toks = [self.tagpart.norm(v)[0] for v in valu]
subs = {
'base': toks[-1],
'depth': len(toks) - 1,
}
if len(toks) > 1:
subs['up'] = '.'.join(toks[:-1])
norm = '.'.join(toks)
if not s_grammar.tagre.fullmatch(norm):
mesg = f'Tag does not match tagre: [{s_grammar.tagre.pattern}]'
raise s_exc.BadTypeValu(valu=norm, name=self.name, mesg=mesg)
core = self.modl.core
if core is not None:
(ok, mesg) = core.isTagValid(norm)
if not ok:
raise s_exc.BadTypeValu(valu=norm, name=self.name, mesg=mesg)
return norm, {'subs': subs, 'toks': toks}
def _normPyStr(self, text):
toks = text.strip('#').split('.')
return self._normPyList(toks)
tagpartre = regex.compile('\\w+')
[docs]
class TagPart(Str):
[docs]
def postTypeInit(self):
Str.postTypeInit(self)
self.setNormFunc(str, self._normPyStr)
def _normPyStr(self, valu):
valu = valu.lower().strip()
parts = tagpartre.findall(valu)
valu = '_'.join(parts)
if len(valu) == 0:
mesg = 'Each tag part must be non-zero length.'
raise s_exc.BadTypeValu(mesg=mesg)
return valu, {}
speed_dist = {
'mm': 1,
'millimeters': 1,
'k': 1000000,
'km': 1000000,
'kilometers': 1000000,
'nmi': 1852000,
'in': 25.4,
'inches': 25.4,
'ft': 304.8,
'feet': 304.8,
'mi': 1609344,
'miles': 1609344,
}
speed_dura = {
's': 1,
'sec': 1,
'min': 60,
'minute': 60,
'h': 3600,
'hr': 3600,
'hour': 3600,
}
[docs]
class Velocity(IntBase):
oflight = 299792458000
stortype = s_layer.STOR_TYPE_I64
_opt_defs = (
('relative', False),
)
[docs]
def postTypeInit(self):
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(int, self._normPyInt)
def _normPyStr(self, valu):
valu = valu.lower().strip()
if not valu:
mesg = 'Empty string is not a valid velocity.'
raise s_exc.BadTypeValu(mesg=mesg)
nums, offs = s_grammar.nom(valu, 0, cset='-0123456789.')
if not nums:
nums = '1'
base = float(nums)
if base < 0 and not self.opts.get('relative'):
mesg = 'Non-relative velocities may not be negative.'
raise s_exc.BadTypeValu(mesg=mesg)
unit = valu[offs:].strip()
if not unit:
return int(base), {}
if unit.find('/') != -1:
dist, dura = unit.split('/', 1)
distmod = speed_dist.get(dist)
if distmod is None:
mesg = f'Unrecognized distance type: {dist}.'
raise s_exc.BadTypeValu(mesg=mesg)
duramod = speed_dura.get(dura)
if duramod is None:
mesg = f'Unrecognized duration type: {dura}.'
raise s_exc.BadTypeValu(mesg=mesg)
norm = int((base * distmod) / duramod)
return norm, {}
if unit == 'mph':
norm = int((base * 1609344) / 3600)
return norm, {}
if unit == 'kph':
norm = int((base * 1000000) / 3600)
return norm, {}
if unit in ('knots', 'kts'):
norm = int((base * 1852000) / 3600)
return norm, {}
if unit == 'c':
return int(base * self.oflight), {}
mesg = f'Unknown velocity unit: {unit}.'
raise s_exc.BadTypeValu(mesg=mesg)
def _normPyInt(self, valu):
if valu < 0 and not self.opts.get('relative'):
mesg = 'Non-relative velocities may not be negative.'
raise s_exc.BadTypeValu(mesg=mesg)
return valu, {}
[docs]
class Duration(IntBase):
stortype = s_layer.STOR_TYPE_U64
_opt_defs = (
('signed', False),
)
[docs]
def postTypeInit(self):
self.setNormFunc(str, self._normPyStr)
self.setNormFunc(int, self._normPyInt)
def _normPyInt(self, valu):
return valu, {}
def _normPyStr(self, text):
text = text.strip()
if not text:
mesg = 'Duration string must have non-zero length.'
raise s_exc.BadTypeValu(mesg=mesg)
dura = 0
try:
if text.find('D') != -1:
daystext, text = text.split('D', 1)
dura += int(daystext.strip(), 0) * s_time.oneday
text = text.strip()
if text:
if text.find(':') != -1:
parts = text.split(':')
if len(parts) == 2:
dura += int(parts[0].strip()) * s_time.onemin
dura += int(float(parts[1].strip()) * s_time.onesec)
elif len(parts) == 3:
dura += int(parts[0].strip()) * s_time.onehour
dura += int(parts[1].strip()) * s_time.onemin
dura += int(float(parts[2].strip()) * s_time.onesec)
else:
mesg = 'Invalid number of : characters for duration.'
raise s_exc.BadTypeValu(mesg=mesg)
else:
dura += int(float(text) * s_time.onesec)
except ValueError:
mesg = f'Invalid numeric value in duration: {text}.'
raise s_exc.BadTypeValu(mesg=mesg) from None
return dura, {}
[docs]
def repr(self, valu):
days, rem = divmod(valu, s_time.oneday)
hours, rem = divmod(rem, s_time.onehour)
minutes, rem = divmod(rem, s_time.onemin)
seconds, millis = divmod(rem, s_time.onesec)
retn = ''
if days:
retn += f'{days}D '
retn += f'{hours:02}:{minutes:02}:{seconds:02}.{millis:03}'
return retn
[docs]
class Time(IntBase):
stortype = s_layer.STOR_TYPE_TIME
_opt_defs = (
('ismin', False), # type: ignore
('ismax', False),
)
[docs]
def postTypeInit(self):
self.futsize = 0x7fffffffffffffff
self.maxsize = 253402300799999 # 9999/12/31 23:59:59.999
self.setNormFunc(int, self._normPyInt)
self.setNormFunc(str, self._normPyStr)
self.setCmprCtor('@=', self._ctorCmprAt)
self.ismin = self.opts.get('ismin')
self.ismax = self.opts.get('ismax')
self.storlifts.update({
'@=': self._liftByIval,
})
if self.ismin:
self.stortype = s_layer.STOR_TYPE_MINTIME
elif self.ismax:
self.stortype = s_layer.STOR_TYPE_MAXTIME
def _liftByIval(self, cmpr, valu):
if type(valu) not in (list, tuple):
norm, info = self.norm(valu)
return (
('=', norm, self.stortype),
)
ticktock = self.getTickTock(valu)
return (
(cmpr, ticktock, self.stortype),
)
def _storLiftRange(self, cmpr, valu):
if type(valu) not in (list, tuple):
mesg = f'Range value must be a list: {valu!r}'
raise s_exc.BadTypeValu(mesg=mesg)
ticktock = self.getTickTock(valu)
return (
(cmpr, ticktock, self.stortype),
)
def _ctorCmprAt(self, valu):
return self.modl.types.get('ival')._ctorCmprAt(valu)
def _normPyStr(self, valu):
valu = valu.strip().lower()
if valu == 'now':
return self._normPyInt(s_common.now())
# an unspecififed time in the future...
if valu == '?':
return self.futsize, {}
# parse timezone
valu, base = s_time.parsetz(valu)
# we need to be pretty sure this is meant for us, otherwise it might
# just be a slightly messy time parse
unitcheck = [u for u in s_time.timeunits.keys() if u in valu]
if unitcheck and ('-' in valu or '+' in valu):
splitter = '+'
if '-' in valu:
splitter = '-'
bgn, end = valu.split(splitter, 1)
delt = s_time.delta(splitter + end)
if bgn:
bgn = self._normPyStr(bgn)[0] + base
else:
bgn = s_common.now()
return self._normPyInt(delt + bgn)
valu = s_time.parse(valu, base=base, chop=True)
return self._normPyInt(valu)
def _normPyInt(self, valu):
if valu > self.maxsize and valu != self.futsize:
mesg = f'Time exceeds max size [{self.maxsize}] allowed for a non-future marker, got {valu}'
raise s_exc.BadTypeValu(mesg=mesg, valu=valu, name=self.name)
return valu, {}
[docs]
def merge(self, oldv, newv):
if self.ismin:
return min(oldv, newv)
if self.ismax:
return max(oldv, newv)
return newv
[docs]
def repr(self, valu):
if valu == self.futsize:
return '?'
return s_time.repr(valu)
def _getLiftValu(self, valu, relto=None):
if isinstance(valu, str):
lowr = valu.strip().lower()
if not lowr:
mesg = f'Invalid time provided, got [{valu}]'
raise s_exc.BadTypeValu(mesg=mesg, name=self.name, valu=valu)
if lowr == 'now':
return s_common.now()
if lowr[0] in ('-', '+'):
delt = s_time.delta(lowr)
if relto is None:
relto = s_common.now()
return self._normPyInt(delt + relto)[0]
return self.norm(valu)[0]
[docs]
def getTickTock(self, vals):
'''
Get a tick, tock time pair.
Args:
vals (list): A pair of values to norm.
Returns:
(int, int): A ordered pair of integers.
'''
if len(vals) != 2:
mesg = 'Time range must have a length of 2: %r' % (vals,)
raise s_exc.BadTypeValu(mesg=mesg)
val0, val1 = vals
try:
_tick = self._getLiftValu(val0)
except ValueError:
mesg = f'Unable to process the value for val0 in _getLiftValu, got {val0}'
raise s_exc.BadTypeValu(name=self.name, valu=val0,
mesg=mesg) from None
sortval = False
if isinstance(val1, str):
if val1.startswith(('+-', '-+')):
sortval = True
delt = s_time.delta(val1[2:])
# order matters
_tock = _tick + delt
_tick = _tick - delt
elif val1.startswith('-'):
sortval = True
_tock = self._getLiftValu(val1, relto=_tick)
else:
_tock = self._getLiftValu(val1, relto=_tick)
else:
_tock = self._getLiftValu(val1, relto=_tick)
if sortval and _tick >= _tock:
tick = min(_tick, _tock)
tock = max(_tick, _tock)
return tick, tock
return _tick, _tock
def _ctorCmprRange(self, vals):
'''
Override default range= handler to account for relative computation.
'''
if not isinstance(vals, (list, tuple)):
mesg = f'Must be a 2-tuple: {s_common.trimText(repr(vals))}'
raise s_exc.BadCmprValu(itemtype=type(vals), cmpr='range=', mesg=mesg)
if len(vals) != 2:
mesg = f'Must be a 2-tuple: {s_common.trimText(repr(vals))}'
raise s_exc.BadCmprValu(itemtype=type(vals), cmpr='range=', mesg=mesg)
tick, tock = self.getTickTock(vals)
if tick > tock:
# User input has requested a nullset
def cmpr(valu):
return False
return cmpr
def cmpr(valu):
return tick <= valu <= tock
return cmpr
def _ctorCmprLt(self, text):
if isinstance(text, str):
strip = text.strip()
if strip.endswith('*'):
tick, tock = s_time.wildrange(strip[:-1])
def cmpr(valu):
return valu < tock
return cmpr
return IntBase._ctorCmprLt(self, text)
def _ctorCmprLe(self, text):
if isinstance(text, str):
strip = text.strip()
if strip.endswith('*'):
tick, tock = s_time.wildrange(strip[:-1])
def cmpr(valu):
return valu <= tock
return cmpr
return IntBase._ctorCmprLe(self, text)
def _ctorCmprEq(self, text):
if isinstance(text, str):
strip = text.strip()
if strip.endswith('*'):
tick, tock = s_time.wildrange(strip[:-1])
def cmpr(valu):
return valu >= tick and valu < tock
return cmpr
norm, info = self.norm(text)
def cmpr(valu):
return norm == valu
return cmpr
def _storLiftNorm(self, cmpr, valu):
if isinstance(valu, str):
text = valu.strip()
if text.endswith('*'):
if cmpr == '=':
tick, tock = s_time.wildrange(text[:-1])
return (
('range=', (tick, tock), self.stortype),
)
if cmpr == '<':
tick, tock = s_time.wildrange(text[:-1])
return (
('<', tock, self.stortype),
)
if cmpr == '<=':
tick, tock = s_time.wildrange(text[:-1])
return (
('<=', tock, self.stortype),
)
return IntBase._storLiftNorm(self, cmpr, valu)