import types
import pprint
import asyncio
import hashlib
import logging
import argparse
import contextlib
import collections
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.telepath as s_telepath
import synapse.datamodel as s_datamodel
import synapse.lib.ast as s_ast
import synapse.lib.auth as s_auth
import synapse.lib.base as s_base
import synapse.lib.chop as s_chop
import synapse.lib.coro as s_coro
import synapse.lib.node as s_node
import synapse.lib.snap as s_snap
import synapse.lib.cache as s_cache
import synapse.lib.layer as s_layer
import synapse.lib.scope as s_scope
import synapse.lib.config as s_config
import synapse.lib.autodoc as s_autodoc
import synapse.lib.grammar as s_grammar
import synapse.lib.msgpack as s_msgpack
import synapse.lib.spooled as s_spooled
import synapse.lib.version as s_version
import synapse.lib.hashitem as s_hashitem
import synapse.lib.stormctrl as s_stormctrl
import synapse.lib.stormtypes as s_stormtypes
import synapse.lib.stormlib.graph as s_stormlib_graph
logger = logging.getLogger(__name__)
addtriggerdescr = '''
Add a trigger to the cortex.
Notes:
Valid values for condition are:
* tag:add
* tag:del
* node:add
* node:del
* prop:set
* edge:add
* edge:del
When condition is tag:add or tag:del, you may optionally provide a form name
to restrict the trigger to fire only on tags added or deleted from nodes of
those forms.
The added tag is provided to the query in the ``$auto`` dictionary variable under
``$auto.opts.tag``. Usage of the ``$tag`` variable is deprecated and it will no longer
be populated in Synapse v3.0.0.
Simple one level tag globbing is supported, only at the end after a period,
that is aka.* matches aka.foo and aka.bar but not aka.foo.bar. aka* is not
supported.
When the condition is edge:add or edge:del, you may optionally provide a
form name or a destination form name to only fire on edges added or deleted
from nodes of those forms.
Examples:
# Adds a tag to every inet:ipv4 added
trigger.add node:add --form inet:ipv4 --query {[ +#mytag ]}
# Adds a tag #todo to every node as it is tagged #aka
trigger.add tag:add --tag aka --query {[ +#todo ]}
# Adds a tag #todo to every inet:ipv4 as it is tagged #aka
trigger.add tag:add --form inet:ipv4 --tag aka --query {[ +#todo ]}
# Adds a tag #todo to the N1 node of every refs edge add
trigger.add edge:add --verb refs --query {[ +#todo ]}
# Adds a tag #todo to the N1 node of every seen edge delete, provided that
# both nodes are of form file:bytes
trigger.add edge:del --verb seen --form file:bytes --n2form file:bytes --query {[ +#todo ]}
'''
addcrondescr = '''
Add a recurring cron job to a cortex.
Notes:
All times are interpreted as UTC.
All arguments are interpreted as the job period, unless the value ends in
an equals sign, in which case the argument is interpreted as the recurrence
period. Only one recurrence period parameter may be specified.
Currently, a fixed unit must not be larger than a specified recurrence
period. i.e. '--hour 7 --minute +15' (every 15 minutes from 7-8am?) is not
supported.
Value values for fixed hours are 0-23 on a 24-hour clock where midnight is 0.
If the --day parameter value does not start with a '+' and is an integer, it is
interpreted as a fixed day of the month. A negative integer may be
specified to count from the end of the month with -1 meaning the last day
of the month. All fixed day values are clamped to valid days, so for
example '-d 31' will run on February 28.
If the fixed day parameter is a value in ([Mon, Tue, Wed, Thu, Fri, Sat,
Sun] if locale is set to English) it is interpreted as a fixed day of the
week.
Otherwise, if the parameter value starts with a '+', then it is interpreted
as a recurrence interval of that many days.
If no plus-sign-starting parameter is specified, the recurrence period
defaults to the unit larger than all the fixed parameters. e.g. '--minute 5'
means every hour at 5 minutes past, and --hour 3, --minute 1 means 3:01 every day.
At least one optional parameter must be provided.
All parameters accept multiple comma-separated values. If multiple
parameters have multiple values, all combinations of those values are used.
All fixed units not specified lower than the recurrence period default to
the lowest valid value, e.g. --month +2 will be scheduled at 12:00am the first of
every other month. One exception is if the largest fixed value is day of the
week, then the default period is set to be a week.
A month period with a day of week fixed value is not currently supported.
Fixed-value year (i.e. --year 2019) is not supported. See the 'at'
command for one-time cron jobs.
As an alternative to the above options, one may use exactly one of
--hourly, --daily, --monthly, --yearly with a colon-separated list of
fixed parameters for the value. It is an error to use both the individual
options and these aliases at the same time.
Examples:
Run a query every last day of the month at 3 am
cron.add --hour 3 --day -1 {#foo}
Run a query every 8 hours
cron.add --hour +8 {#foo}
Run a query every Wednesday and Sunday at midnight and noon
cron.add --hour 0,12 --day Wed,Sun {#foo}
Run a query every other day at 3:57pm
cron.add --day +2 --minute 57 --hour 15 {#foo}
'''
atcrondescr = '''
Adds a non-recurring cron job.
Notes:
This command accepts one or more time specifications followed by exactly
one storm query in curly braces. Each time specification may be in synapse
time delta format (e.g --day +1) or synapse time format (e.g.
20501217030432101). Seconds will be ignored, as cron jobs' granularity is
limited to minutes.
All times are interpreted as UTC.
The other option for time specification is a relative time from now. This
consists of a plus sign, a positive integer, then one of 'minutes, hours,
days'.
Note that the record for a cron job is stored until explicitly deleted via
"cron.del".
Examples:
# Run a storm query in 5 minutes
cron.at --minute +5 {[inet:ipv4=1]}
# Run a storm query tomorrow and in a week
cron.at --day +1,+7 {[inet:ipv4=1]}
# Run a query at the end of the year Zulu
cron.at --dt 20181231Z2359 {[inet:ipv4=1]}
'''
wgetdescr = '''Retrieve bytes from a URL and store them in the axon. Yields inet:urlfile nodes.
Examples:
# Specify custom headers and parameters
inet:url=https://vertex.link/foo.bar.txt | wget --headers ({"User-Agent": "Foo/Bar"}) --params ({"clientid": "42"})
# Download multiple URL targets without inbound nodes
wget https://vertex.link https://vtx.lk
'''
permdef_schema = {
'type': 'object',
'properties': {
'perm': {'type': 'array', 'items': {'type': 'string'}},
'desc': {'type': 'string'},
'gate': {'type': 'string'},
'ex': {'type': 'string'}, # Example string
'workflowconfig': {'type': 'boolean'},
'default': {'type': 'boolean', 'default': False},
},
'required': ['perm', 'desc', 'gate'],
}
reqValidPermDef = s_config.getJsValidator(permdef_schema)
reqValidPkgdef = s_config.getJsValidator({
'type': 'object',
'properties': {
'name': {'type': 'string'},
'version': {
'type': 'string',
'pattern': s_version.semverstr,
},
'build': {
'type' 'object'
'properties': {
'time': {'type': 'number'},
},
'required': ['time'],
},
'codesign': {
'type': 'object',
'properties': {
'sign': {'type': 'string'},
'cert': {'type': 'string'},
},
'required': ['cert', 'sign'],
},
# TODO: Remove me after Synapse 3.0.0.
'synapse_minversion': {
'type': ['array', 'null'],
'items': {'type': 'number'}
},
'synapse_version': {
'type': 'string',
},
'modules': {
'type': ['array', 'null'],
'items': {'$ref': '#/definitions/module'}
},
'docs': {
'type': ['array', 'null'],
'items': {'$ref': '#/definitions/doc'},
},
'logo': {
'type': 'object',
'properties': {
'mime': {'type': 'string'},
'file': {'type': 'string'},
},
'additionalProperties': True,
'required': ['mime', 'file'],
},
'commands': {
'type': ['array', 'null'],
'items': {'$ref': '#/definitions/command'},
},
'graphs': {
'type': ['array', 'null'],
'items': s_stormlib_graph.gdefSchema,
},
'desc': {'type': 'string'},
'svciden': {'type': ['string', 'null'], 'pattern': s_config.re_iden},
'onload': {'type': 'string'},
'author': {
'type': 'object',
'properties': {
'url': {'type': 'string'},
'name': {'type': 'string'},
},
'required': ['name', 'url'],
},
'depends': {
'properties': {
'requires': {'type': 'array', 'items': {'$ref': '#/definitions/require'}},
'conflicts': {'type': 'array', 'items': {'$ref': '#/definitions/conflict'}},
},
'additionalProperties': True,
},
'perms': {
'type': 'array',
'items': permdef_schema,
},
'configvars': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'varname': {'type': 'string'},
'desc': {'type': 'string'},
'default': {},
'workflowconfig': {'type': 'boolean'},
'type': {'$ref': '#/definitions/configvartype'},
'scopes': {
'type': 'array',
'items': {
'type': 'string',
'enum': ['global', 'self']
},
},
},
'required': ['name', 'varname', 'desc', 'type', 'scopes'],
},
},
},
'additionalProperties': True,
'required': ['name', 'version'],
'definitions': {
'doc': {
'type': 'object',
'properties': {
'title': {'type': 'string'},
'content': {'type': 'string'},
},
'additionalProperties': True,
'required': ['title', 'content'],
},
'module': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'storm': {'type': 'string'},
'modconf': {'type': 'object'},
'apidefs': {
'type': ['array', 'null'],
'items': {'$ref': '#/definitions/apidef'},
},
'asroot': {'type': 'boolean'},
'asroot:perms': {'type': 'array',
'items': {'type': 'array',
'items': {'type': 'string'}},
},
},
'additionalProperties': True,
'required': ['name', 'storm']
},
'apidef': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'desc': {'type': 'string'},
'deprecated': {'$ref': '#/definitions/deprecatedItem'},
'type': {
'type': 'object',
'properties': {
'type': {
'type': 'string',
'enum': ['function']
},
'args': {
'type': 'array',
'items': {'$ref': '#/definitions/apiarg'},
},
'returns': {
'type': 'object',
'properties': {
'name': {
'type': 'string',
'enum': ['yields'],
},
'desc': {'type': 'string'},
'type': {
'oneOf': [
{'$ref': '#/definitions/apitype'},
{'type': 'array', 'items': {'$ref': '#/definitions/apitype'}},
],
},
},
'additionalProperties': False,
'required': ['type', 'desc']
},
},
'additionalProperties': False,
'required': ['type', 'returns'],
},
},
'additionalProperties': False,
'required': ['name', 'desc', 'type']
},
'apiarg': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'desc': {'type': 'string'},
'type': {
'oneOf': [
{'$ref': '#/definitions/apitype'},
{'type': 'array', 'items': {'$ref': '#/definitions/apitype'}},
],
},
'default': {'type': ['boolean', 'integer', 'string', 'null']},
},
'additionalProperties': False,
'required': ['name', 'desc', 'type']
},
'deprecatedItem': {
'type': 'object',
'properties': {
'eolvers': {'type': 'string', 'minLength': 1,
'description': "The version which will not longer support the item."},
'eoldate': {'type': 'string', 'minLength': 1,
'description': 'Optional string indicating Synapse releases after this date may no longer support the item.'},
'mesg': {'type': ['string', 'null'], 'default': None,
'description': 'Optional message to include in the warning text.'}
},
'oneOf': [
{
'required': ['eolvers'],
'not': {'required': ['eoldate']}
},
{
'required': ['eoldate'],
'not': {'required': ['eolvers']}
}
],
'additionalProperties': False,
},
'apitype': {
'type': 'string',
},
'command': {
'type': 'object',
'properties': {
'name': {
'type': 'string',
'pattern': s_grammar.re_scmd
},
'cmdargs': {
'type': ['array', 'null'],
'items': {'$ref': '#/definitions/cmdarg'},
},
'cmdinputs': {
'type': ['array', 'null'],
'items': {'$ref': '#/definitions/cmdinput'},
},
'storm': {'type': 'string'},
'forms': {'$ref': '#/definitions/cmdformhints'},
'perms': {'type': 'array',
'items': {'type': 'array',
'items': {'type': 'string'}},
},
},
'additionalProperties': True,
'required': ['name', 'storm']
},
'cmdarg': {
'type': 'array',
'items': [
{'type': 'string'},
{
'type': 'object',
'properties': {
'help': {'type': 'string'},
'default': {},
'dest': {'type': 'string'},
'required': {'type': 'boolean'},
'action': {'type': 'string'},
'nargs': {'type': ['string', 'integer']},
'choices': {
'type': 'array',
'uniqueItems': True,
'minItems': 1,
},
'type': {
'type': 'string',
'enum': list(s_datamodel.Model().types)
},
},
}
],
'additionalItems': False,
},
'cmdinput': {
'type': 'object',
'properties': {
'form': {'type': 'string'},
'help': {'type': 'string'},
},
'additionalProperties': True,
'required': ['form'],
},
'configvartype': {
'anyOf': [
{'type': 'array', 'items': {'$ref': '#/definitions/configvartype'}},
{'type': 'string'},
]
},
# deprecated
'cmdformhints': {
'type': 'object',
'properties': {
'input': {
'type': 'array',
'uniqueItems': True,
'items': {
'type': 'string',
}
},
'output': {
'type': 'array',
'uniqueItems': True,
'items': {
'type': 'string',
}
},
'nodedata': {
'type': 'array',
'uniqueItems': True,
'items': {
'type': 'array',
'items': [
{'type': 'string'},
{'type': 'string'},
],
'additionalItems': False,
},
},
}
},
'require': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'version': {'type': 'string'},
'desc': {'type': 'string'},
'optional': {'type': 'boolean'},
},
'additionalItems': True,
'required': ('name', 'version'),
},
'conflict': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'version': {'type': 'string'},
'desc': {'type': 'string'},
},
'additionalItems': True,
'required': ('name',),
},
}
})
reqValidDdef = s_config.getJsValidator({
'type': 'object',
'properties': {
'name': {'type': 'string'},
'storm': {'type': 'string'},
'view': {'type': 'string', 'pattern': s_config.re_iden},
'user': {'type': 'string', 'pattern': s_config.re_iden},
'iden': {'type': 'string', 'pattern': s_config.re_iden},
'enabled': {'type': 'boolean', 'default': True},
'stormopts': {
'oneOf': [
{'type': 'null'},
{'$ref': '#/definitions/stormopts'}
]
}
},
'additionalProperties': True,
'required': ['iden', 'user', 'storm'],
'definitions': {
'stormopts': {
'type': 'object',
'properties': {
'repr': {'type': 'boolean'},
'path': {'type': 'string'},
'show': {'type': 'array', 'items': {'type': 'string'}}
},
'additionalProperties': True,
},
}
})
stormcmds = (
{
'name': 'queue.add',
'descr': 'Add a queue to the cortex.',
'cmdargs': (
('name', {'help': 'The name of the new queue.'}),
),
'storm': '''
$lib.queue.add($cmdopts.name)
$lib.print("queue added: {name}", name=$cmdopts.name)
''',
},
{
'name': 'queue.del',
'descr': 'Remove a queue from the cortex.',
'cmdargs': (
('name', {'help': 'The name of the queue to remove.'}),
),
'storm': '''
$lib.queue.del($cmdopts.name)
$lib.print("queue removed: {name}", name=$cmdopts.name)
''',
},
{
'name': 'queue.list',
'descr': 'List the queues in the cortex.',
'storm': '''
$lib.print('Storm queue list:')
for $info in $lib.queue.list() {
$name = $info.name.ljust(32)
$lib.print(" {name}: size: {size} offs: {offs}", name=$name, size=$info.size, offs=$info.offs)
}
''',
},
{
'name': 'dmon.list',
'descr': 'List the storm daemon queries running in the cortex.',
'cmdargs': (),
'storm': '''
$lib.print('Storm daemon list:')
for $info in $lib.dmon.list() {
if $info.name { $name = $info.name.ljust(20) }
else { $name = ' ' }
$lib.print(" {iden}: ({name}): {status}", iden=$info.iden, name=$name, status=$info.status)
}
''',
},
{
'name': 'feed.list',
'descr': 'List the feed functions available in the Cortex',
'cmdargs': (),
'storm': '''
$lib.print('Storm feed list:')
for $flinfo in $lib.feed.list() {
$flname = $flinfo.name.ljust(30)
$lib.print(" ({name}): {desc}", name=$flname, desc=$flinfo.desc)
}
'''
},
{
'name': 'layer.add',
'descr': 'Add a layer to the cortex.',
'cmdargs': (
('--lockmemory', {'help': 'Should the layer lock memory for performance.',
'action': 'store_true'}),
('--readonly', {'help': 'Should the layer be readonly.',
'action': 'store_true'}),
('--mirror', {'help': 'A telepath URL of an upstream layer/view to mirror.', 'type': 'str'}),
('--growsize', {'help': 'Amount to grow the map size when necessary.', 'type': 'int'}),
('--upstream', {'help': 'One or more telepath urls to receive updates from.'}),
('--name', {'help': 'The name of the layer.'}),
),
'storm': '''
$layr = $lib.layer.add($cmdopts)
$lib.print($layr.repr())
$lib.print("Layer added.")
''',
},
{
'name': 'layer.set',
'descr': 'Set a layer option.',
'cmdargs': (
('iden', {'help': 'Iden of the layer to modify.'}),
('name', {'help': 'The name of the layer property to set.'}),
('valu', {'help': 'The value to set the layer property to.'}),
),
'storm': '''
$layr = $lib.layer.get($cmdopts.iden)
$layr.set($cmdopts.name, $cmdopts.valu)
$lib.print($layr.repr())
$lib.print('Layer updated.')
''',
},
{
'name': 'layer.del',
'descr': 'Delete a layer from the cortex.',
'cmdargs': (
('iden', {'help': 'Iden of the layer to delete.'}),
),
'storm': '''
$lib.layer.del($cmdopts.iden)
$lib.print("Layer deleted: {iden}", iden=$cmdopts.iden)
''',
},
{
'name': 'layer.get',
'descr': 'Get a layer from the cortex.',
'cmdargs': (
('iden', {'nargs': '?',
'help': 'Iden of the layer to get. If no iden is provided, the main layer will be returned.'}),
),
'storm': '''
$layr = $lib.layer.get($cmdopts.iden)
$lib.print($layr.repr())
''',
},
{
'name': 'layer.list',
'descr': 'List the layers in the cortex.',
'cmdargs': (),
'storm': '''
$lib.print('Layers:')
for $layr in $lib.layer.list() {
$lib.print($layr.repr())
}
''',
},
{
'name': 'layer.pull.add',
'descr': 'Add a pull configuration to a layer.',
'cmdargs': (
('layr', {'help': 'Iden of the layer to pull to.'}),
('src', {'help': 'Telepath url of the source layer to pull from.'}),
('--offset', {'help': 'Layer offset to begin pulling from',
'type': 'int',
'default': 0}),
),
'storm': '''
$layr = $lib.layer.get($cmdopts.layr)
$pdef = $layr.addPull($cmdopts.src, $cmdopts.offset)
if $pdef {
$lib.print("Layer pull added: {iden}", iden=$pdef.iden)
}
''',
},
{
'name': 'layer.pull.del',
'descr': 'Delete a pull configuration from a layer.',
'cmdargs': (
('layr', {'help': 'Iden of the layer to modify.'}),
('iden', {'help': 'Iden of the pull configuration to delete.'}),
),
'storm': '''
$layr = $lib.layer.get($cmdopts.layr)
$retn = $layr.delPull($cmdopts.iden)
$lib.print("Layer pull deleted.")
''',
},
{
'name': 'layer.pull.list',
'descr': 'Get a list of the pull configurations for a layer.',
'cmdargs': (
('layr', {'help': 'Iden of the layer to retrieve pull configurations for.'}),
),
'storm': '''
$layr = $lib.layer.get($cmdopts.layr)
$lib.print($layr.repr())
$pulls = $layr.get(pulls)
if $pulls {
$lib.print('Pull Iden | User | Time | Offset | URL')
$lib.print('------------------------------------------------------------------------------------------------------------------------------------------')
for ($iden, $pdef) in $pulls {
$user = $lib.auth.users.get($pdef.user)
if $user { $user = $user.name.ljust(20) }
else { $user = $pdef.user }
$tstr = $lib.time.format($pdef.time, '%Y-%m-%d %H:%M:%S')
$ostr = $lib.cast(str, $pdef.offs).rjust(10)
$lib.print("{iden} | {user} | {time} | {offs} | {url}", iden=$iden, time=$tstr, user=$user, offs=$ostr, url=$pdef.url)
}
} else {
$lib.print('No pulls configured.')
}
''',
},
{
'name': 'layer.push.add',
'descr': 'Add a push configuration to a layer.',
'cmdargs': (
('layr', {'help': 'Iden of the layer to push from.'}),
('dest', {'help': 'Telepath url of the layer to push to.'}),
('--offset', {'help': 'Layer offset to begin pushing from.',
'type': 'int',
'default': 0}),
),
'storm': '''
$layr = $lib.layer.get($cmdopts.layr)
$pdef = $layr.addPush($cmdopts.dest, $cmdopts.offset)
if $pdef {
$lib.print("Layer push added: {iden}", iden=$pdef.iden)
}
''',
},
{
'name': 'layer.push.del',
'descr': 'Delete a push configuration from a layer.',
'cmdargs': (
('layr', {'help': 'Iden of the layer to modify.'}),
('iden', {'help': 'Iden of the push configuration to delete.'}),
),
'storm': '''
$layr = $lib.layer.get($cmdopts.layr)
$retn = $layr.delPush($cmdopts.iden)
$lib.print("Layer push deleted.")
''',
},
{
'name': 'layer.push.list',
'descr': 'Get a list of the push configurations for a layer.',
'cmdargs': (
('layr', {'help': 'Iden of the layer to retrieve push configurations for.'}),
),
'storm': '''
$layr = $lib.layer.get($cmdopts.layr)
$lib.print($layr.repr())
$pushs = $layr.get(pushs)
if $pushs {
$lib.print('Push Iden | User | Time | Offset | URL')
$lib.print('------------------------------------------------------------------------------------------------------------------------------------------')
for ($iden, $pdef) in $pushs {
$user = $lib.auth.users.get($pdef.user)
if $user { $user = $user.name.ljust(20) }
else { $user = $pdef.user }
$tstr = $lib.time.format($pdef.time, '%Y-%m-%d %H:%M:%S')
$ostr = $lib.cast(str, $pdef.offs).rjust(10)
$lib.print("{iden} | {user} | {time} | {offs} | {url}", iden=$iden, time=$tstr, user=$user, offs=$ostr, url=$pdef.url)
}
} else {
$lib.print('No pushes configured.')
}
''',
},
{
'name': 'pkg.list',
'descr': 'List the storm packages loaded in the cortex.',
'cmdargs': (
('--verbose', {'default': False, 'action': 'store_true',
'help': 'Display build time for each package.'}),
),
'storm': '''
init {
$conf = ({
"columns": [
{"name": "name", "width": 40},
{"name": "vers", "width": 10},
],
"separators": {
"row:outline": false,
"column:outline": false,
"header:row": "#",
"data:row": "",
"column": "",
},
})
if $cmdopts.verbose {
$conf.columns.append(({"name": "time", "width": 20}))
}
$printer = $lib.tabular.printer($conf)
}
$pkgs = $lib.pkg.list()
if $($pkgs.size() > 0) {
$lib.print('Loaded storm packages:')
$lib.print($printer.header())
for $pkg in $pkgs {
$row = (
$pkg.name, $pkg.version,
)
if $cmdopts.verbose {
try {
$row.append($lib.time.format($pkg.build.time, '%Y-%m-%d %H:%M:%S'))
} catch StormRuntimeError as _ {
$row.append('not available')
}
}
$lib.print($printer.row($row))
}
} else {
$lib.print('No storm packages installed.')
}
'''
},
{
'name': 'pkg.perms.list',
'descr': 'List any permissions declared by the package.',
'cmdargs': (
('name', {'help': 'The name (or name prefix) of the package.', 'type': 'str'}),
),
'storm': '''
$pdef = $lib.null
for $pkg in $lib.pkg.list() {
if $pkg.name.startswith($cmdopts.name) {
$pdef = $pkg
break
}
}
if (not $pdef) {
$lib.warn(`Package ({$cmdopts.name}) not found!`)
} else {
if $pdef.perms {
$lib.print(`Package ({$cmdopts.name}) defines the following permissions:`)
for $permdef in $pdef.perms {
$defv = $permdef.default
if ( $defv = $lib.null ) {
$defv = $lib.false
}
$text = `{$lib.str.join('.', $permdef.perm).ljust(32)} : {$permdef.desc} ( default: {$defv} )`
$lib.print($text)
}
} else {
$lib.print(`Package ({$cmdopts.name}) contains no permissions definitions.`)
}
}
'''
},
{
'name': 'pkg.del',
'descr': 'Remove a storm package from the cortex.',
'cmdargs': (
('name', {'help': 'The name (or name prefix) of the package to remove.'}),
),
'storm': '''
$pkgs = $lib.set()
for $pkg in $lib.pkg.list() {
if $pkg.name.startswith($cmdopts.name) {
$pkgs.add($pkg.name)
}
}
if $($pkgs.size() = 0) {
$lib.print('No package names match "{name}". Aborting.', name=$cmdopts.name)
} elif $($pkgs.size() = 1) {
$name = $pkgs.list().index(0)
$lib.print('Removing package: {name}', name=$name)
$lib.pkg.del($name)
} else {
$lib.print('Multiple package names match "{name}". Aborting.', name=$cmdopts.name)
}
'''
},
{
'name': 'pkg.docs',
'descr': 'Display documentation included in a storm package.',
'cmdargs': (
('name', {'help': 'The name (or name prefix) of the package.'}),
),
'storm': '''
$pdef = $lib.null
for $pkg in $lib.pkg.list() {
if $pkg.name.startswith($cmdopts.name) {
$pdef = $pkg
break
}
}
if (not $pdef) {
$lib.warn("Package ({name}) not found!", name=$cmdopts.name)
} else {
if $pdef.docs {
for $doc in $pdef.docs {
$lib.print($doc.content)
}
} else {
$lib.print("Package ({name}) contains no documentation.", name=$cmdopts.name)
}
}
'''
},
{
'name': 'pkg.load',
'descr': 'Load a storm package from an HTTP URL.',
'cmdargs': (
('url', {'help': 'The HTTP URL to load the package from.'}),
('--raw', {'default': False, 'action': 'store_true',
'help': 'Response JSON is a raw package definition without an envelope.'}),
('--verify', {'default': False, 'action': 'store_true',
'help': 'Enforce code signature verification on the storm package.'}),
('--ssl-noverify', {'default': False, 'action': 'store_true',
'help': 'Specify to disable SSL verification of the server.'}),
),
'storm': '''
init {
$ssl = $lib.true
if $cmdopts.ssl_noverify { $ssl = $lib.false }
$resp = $lib.inet.http.get($cmdopts.url, ssl_verify=$ssl)
if ($resp.code != 200) {
$lib.warn("pkg.load got HTTP code: {code} for URL: {url}", code=$resp.code, url=$cmdopts.url)
$lib.exit()
}
$reply = $resp.json()
if $cmdopts.raw {
$pkg = $reply
} else {
if ($reply.status != "ok") {
$lib.warn("pkg.load got JSON error: {code} for URL: {url}", code=$reply.code, url=$cmdopts.url)
$lib.exit()
}
$pkg = $reply.result
}
$pkd = $lib.pkg.add($pkg, verify=$cmdopts.verify)
$lib.print("Loaded Package: {name} @{version}", name=$pkg.name, version=$pkg.version)
}
''',
},
{
'name': 'version',
'descr': 'Show version metadata relating to Synapse.',
'storm': '''
$comm = $lib.version.commit()
$synv = $lib.version.synapse()
if $synv {
$synv = $lib.str.join('.', $synv)
}
if $comm {
$comm = $comm.slice(0,7)
}
$lib.print('Synapse Version: {s}', s=$synv)
$lib.print('Commit Hash: {c}', c=$comm)
''',
},
{
'name': 'view.add',
'descr': 'Add a view to the cortex.',
'cmdargs': (
('--name', {'default': None, 'help': 'The name of the new view.'}),
('--worldreadable', {'type': 'bool', 'default': False, 'help': 'Grant read access to the `all` role.'}),
('--layers', {'default': [], 'nargs': '*', 'help': 'Layers for the view.'}),
),
'storm': '''
$view = $lib.view.add($cmdopts.layers, name=$cmdopts.name, worldreadable=$cmdopts.worldreadable)
$lib.print($view.repr())
$lib.print("View added.")
''',
},
{
'name': 'view.del',
'descr': 'Delete a view from the cortex.',
'cmdargs': (
('iden', {'help': 'Iden of the view to delete.'}),
),
'storm': '''
$lib.view.del($cmdopts.iden)
$lib.print("View deleted: {iden}", iden=$cmdopts.iden)
''',
},
{
'name': 'view.set',
'descr': 'Set a view option.',
'cmdargs': (
('iden', {'help': 'Iden of the view to modify.'}),
('name', {'help': 'The name of the view property to set.'}),
('valu', {'help': 'The value to set the view property to.'}),
),
'storm': '''
$view = $lib.view.get($cmdopts.iden)
$view.set($cmdopts.name, $cmdopts.valu)
$lib.print($view.repr())
$lib.print("View updated.")
''',
},
{
'name': 'view.fork',
'descr': 'Fork a view in the cortex.',
'cmdargs': (
('iden', {'help': 'Iden of the view to fork.'}),
('--name', {'default': None, 'help': 'Name for the newly forked view.'}),
),
'storm': '''
$forkview = $lib.view.get($cmdopts.iden).fork(name=$cmdopts.name)
$lib.print($forkview.repr())
$lib.print("View {iden} forked to new view: {forkiden}", iden=$cmdopts.iden, forkiden=$forkview.iden)
''',
},
{
'name': 'view.get',
'descr': 'Get a view from the cortex.',
'cmdargs': (
('iden', {'nargs': '?',
'help': 'Iden of the view to get. If no iden is provided, the main view will be returned.'}),
),
'storm': '''
$view = $lib.view.get($cmdopts.iden)
$lib.print($view.repr())
''',
},
{
'name': 'view.list',
'descr': 'List the views in the cortex.',
'cmdargs': (),
'storm': '''
$lib.print("")
for $view in $lib.view.list() {
$lib.print($view.repr())
$lib.print("")
}
''',
},
{
'name': 'view.merge',
'descr': 'Merge a forked view into its parent view.',
'cmdargs': (
('iden', {'help': 'Iden of the view to merge.'}),
('--delete', {'default': False, 'action': 'store_true',
'help': 'Once the merge is complete, delete the layer and view.'}),
),
'storm': '''
$view = $lib.view.get($cmdopts.iden)
$view.merge()
if $cmdopts.delete {
$layriden = $view.pack().layers.index(0).iden
$lib.view.del($view.iden)
$lib.layer.del($layriden)
} else {
$view.swapLayer()
}
$lib.print("View merged: {iden}", iden=$cmdopts.iden)
''',
},
{
'name': 'trigger.add',
'descr': addtriggerdescr,
'cmdargs': (
('condition', {'help': 'Condition for the trigger.'}),
('--form', {'help': 'Form to fire on.'}),
('--tag', {'help': 'Tag to fire on.'}),
('--prop', {'help': 'Property to fire on.'}),
('--verb', {'help': 'Edge verb to fire on.'}),
('--n2form', {'help': 'The form of the n2 node to fire on.'}),
('--query', {'help': 'Query for the trigger to execute.', 'required': True,
'dest': 'storm', }),
('--async', {'default': False, 'action': 'store_true',
'help': 'Make the trigger run in the background.'}),
('--disabled', {'default': False, 'action': 'store_true',
'help': 'Create the trigger in disabled state.'}),
('--name', {'help': 'Human friendly name of the trigger.'}),
('--view', {'help': 'The view to add the trigger to.'})
),
'storm': '''
$opts = $lib.copy($cmdopts)
// Set valid tdef keys
$opts.enabled = (not $opts.disabled)
$opts.help = $lib.undef
$opts.disabled = $lib.undef
$trig = $lib.trigger.add($opts)
$lib.print("Added trigger: {iden}", iden=$trig.iden)
''',
},
{
'name': 'trigger.del',
'descr': 'Delete a trigger from the cortex.',
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid trigger iden is accepted.'}),
),
'storm': '''
$iden = $lib.trigger.del($cmdopts.iden)
$lib.print("Deleted trigger: {iden}", iden=$iden)
''',
},
{
'name': 'trigger.mod',
'descr': "Modify an existing trigger's query.",
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid trigger iden is accepted.'}),
('query', {'help': 'New storm query for the trigger.'}),
),
'storm': '''
$iden = $lib.trigger.mod($cmdopts.iden, $cmdopts.query)
$lib.print("Modified trigger: {iden}", iden=$iden)
''',
},
{
'name': 'trigger.list',
'descr': "List existing triggers in the cortex.",
'cmdargs': (
('--all', {'help': 'List every trigger in every readable view, rather than just the current view.', 'action': 'store_true'}),
),
'storm': '''
$triggers = $lib.trigger.list($cmdopts.all)
if $triggers {
$lib.print("user iden view en? async? cond object storm query")
for $trigger in $triggers {
$user = $trigger.username.ljust(10)
$iden = $trigger.iden.ljust(12)
$view = $trigger.view.ljust(12)
($ok, $async) = $lib.trycast(bool, $trigger.async)
if $ok {
$async = $lib.model.type(bool).repr($async).ljust(6)
} else {
$async = $lib.model.type(bool).repr($lib.false).ljust(6)
}
$enabled = $lib.model.type(bool).repr($trigger.enabled).ljust(6)
$cond = $trigger.cond.ljust(9)
$fo = ""
if $trigger.form {
$fo = $trigger.form
}
$pr = ""
if $trigger.prop {
$pr = $trigger.prop
}
if $cond.startswith('tag:') {
$obj = $fo.ljust(14)
$obj2 = $trigger.tag.ljust(10)
} else {
if $pr {
$obj = $pr.ljust(14)
} elif $fo {
$obj = $fo.ljust(14)
} else {
$obj = '<missing> '
}
$obj2 = ' '
}
$lib.print(`{$user} {$iden} {$view} {$enabled} {$async} {$cond} {$obj} {$obj2} {$trigger.storm}`)
}
} else {
$lib.print("No triggers found")
}
''',
},
{
'name': 'trigger.enable',
'descr': 'Enable a trigger in the cortex.',
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid trigger iden is accepted.'}),
),
'storm': '''
$iden = $lib.trigger.enable($cmdopts.iden)
$lib.print("Enabled trigger: {iden}", iden=$iden)
''',
},
{
'name': 'trigger.disable',
'descr': 'Disable a trigger in the cortex.',
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid trigger iden is accepted.'}),
),
'storm': '''
$iden = $lib.trigger.disable($cmdopts.iden)
$lib.print("Disabled trigger: {iden}", iden=$iden)
''',
},
{
'name': 'cron.add',
'descr': addcrondescr,
'cmdargs': (
('query', {'help': 'Query for the cron job to execute.'}),
('--pool', {'action': 'store_true', 'default': False,
'help': 'Allow the cron job to be run by a mirror from the query pool.'}),
('--minute', {'help': 'Minute value for job or recurrence period.'}),
('--name', {'help': 'An optional name for the cron job.'}),
('--doc', {'help': 'An optional doc string for the cron job.'}),
('--hour', {'help': 'Hour value for job or recurrence period.'}),
('--day', {'help': 'Day value for job or recurrence period.'}),
('--month', {'help': 'Month value for job or recurrence period.'}),
('--year', {'help': 'Year value for recurrence period.'}),
('--hourly', {'help': 'Fixed parameters for an hourly job.'}),
('--daily', {'help': 'Fixed parameters for a daily job.'}),
('--monthly', {'help': 'Fixed parameters for a monthly job.'}),
('--yearly', {'help': 'Fixed parameters for a yearly job.'}),
('--iden', {'help': 'Fixed iden to assign to the cron job'}),
('--view', {'help': 'View to run the cron job against'}),
),
'storm': '''
$cron = $lib.cron.add(query=$cmdopts.query,
minute=$cmdopts.minute,
hour=$cmdopts.hour,
day=$cmdopts.day,
pool=$cmdopts.pool,
month=$cmdopts.month,
year=$cmdopts.year,
hourly=$cmdopts.hourly,
daily=$cmdopts.daily,
monthly=$cmdopts.monthly,
yearly=$cmdopts.yearly,
iden=$cmdopts.iden,
view=$cmdopts.view,)
if $cmdopts.doc { $cron.set(doc, $cmdopts.doc) }
if $cmdopts.name { $cron.set(name, $cmdopts.name) }
$lib.print("Created cron job: {iden}", iden=$cron.iden)
''',
},
{
'name': 'cron.at',
'descr': atcrondescr,
'cmdargs': (
('query', {'help': 'Query for the cron job to execute.'}),
('--minute', {'help': 'Minute(s) to execute at.'}),
('--hour', {'help': 'Hour(s) to execute at.'}),
('--day', {'help': 'Day(s) to execute at.'}),
('--dt', {'help': 'Datetime(s) to execute at.'}),
('--now', {'help': 'Execute immediately.', 'default': False, 'action': 'store_true'}),
('--iden', {'help': 'A set iden to assign to the new cron job'}),
('--view', {'help': 'View to run the cron job against'}),
),
'storm': '''
$cron = $lib.cron.at(query=$cmdopts.query,
minute=$cmdopts.minute,
hour=$cmdopts.hour,
day=$cmdopts.day,
dt=$cmdopts.dt,
now=$cmdopts.now,
iden=$cmdopts.iden,
view=$cmdopts.view)
$lib.print("Created cron job: {iden}", iden=$cron.iden)
''',
},
{
'name': 'cron.del',
'descr': 'Delete a cron job from the cortex.',
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid cron job iden is accepted.'}),
),
'storm': '''
$lib.cron.del($cmdopts.iden)
$lib.print("Deleted cron job: {iden}", iden=$cmdopts.iden)
''',
},
{
'name': 'cron.move',
'descr': "Move a cron job from one view to another",
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid cron job iden is accepted.'}),
('view', {'help': 'View to move the cron job to.'}),
),
'storm': '''
$iden = $lib.cron.move($cmdopts.iden, $cmdopts.view)
$lib.print("Moved cron job {iden} to view {view}", iden=$iden, view=$cmdopts.view)
''',
},
{
'name': 'cron.mod',
'descr': "Modify an existing cron job's query.",
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid cron job iden is accepted.'}),
('query', {'help': 'New storm query for the cron job.'}),
),
'storm': '''
$iden = $lib.cron.mod($cmdopts.iden, $cmdopts.query)
$lib.print("Modified cron job: {iden}", iden=$iden)
''',
},
{
'name': 'cron.cleanup',
'descr': "Delete all completed at jobs",
'cmdargs': (),
'storm': '''
$crons = $lib.cron.list()
$count = 0
if $crons {
for $cron in $crons {
$job = $cron.pack()
if (not $job.recs) {
$lib.cron.del($job.iden)
$count = ($count + 1)
}
}
}
$lib.print("{count} cron/at jobs deleted.", count=$count)
''',
},
{
'name': 'cron.list',
'descr': "List existing cron jobs in the cortex.",
'cmdargs': (),
'storm': '''
init {
$conf = ({
"columns": [
{"name": "user", "width": 24},
{"name": "iden", "width": 10},
{"name": "view", "width": 10},
{"name": "en?", "width": 3},
{"name": "rpt?", "width": 4},
{"name": "now?", "width": 4},
{"name": "err?", "width": 4},
{"name": "# start", "width": 7},
{"name": "last start", "width": 16},
{"name": "last end", "width": 16},
{"name": "query", "newlines": "split"},
],
"separators": {
"row:outline": false,
"column:outline": false,
"header:row": "#",
"data:row": "",
"column": "",
},
})
$printer = $lib.tabular.printer($conf)
}
$crons = $lib.cron.list()
if $crons {
$lib.print($printer.header())
for $cron in $crons {
$job = $cron.pprint()
$row = (
$job.user, $job.idenshort, $job.viewshort, $job.enabled,
$job.isrecur, $job.isrunning, $job.iserr, `{$job.startcount}`,
$job.laststart, $job.lastend, $job.query
)
$lib.print($printer.row($row))
}
} else {
$lib.print("No cron jobs found")
}
''',
},
{
'name': 'cron.stat',
'descr': "Gives detailed information about a cron job.",
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid cron job iden is accepted.'}),
),
'storm': '''
$cron = $lib.cron.get($cmdopts.iden)
if $cron {
$job = $cron.pprint()
$lib.print('iden: {iden}', iden=$job.iden)
$lib.print('user: {user}', user=$job.user)
$lib.print('enabled: {enabled}', enabled=$job.enabled)
$lib.print(`pool: {$job.pool}`)
$lib.print('recurring: {isrecur}', isrecur=$job.isrecur)
$lib.print('# starts: {startcount}', startcount=$job.startcount)
$lib.print('# errors: {errcount}', errcount=$job.errcount)
$lib.print('last start time: {laststart}', laststart=$job.laststart)
$lib.print('last end time: {lastend}', lastend=$job.lastend)
$lib.print('last result: {lastresult}', lastresult=$job.lastresult)
$lib.print('query: {query}', query=$job.query)
if $lib.len($job.lasterrs) {
$lib.print('most recent errors:')
for $err in $job.lasterrs {
$lib.print(' {err}', err=$err)
}
}
if $job.recs {
$lib.print('entries: incunit incval required')
for $rec in $job.recs {
$incunit = $lib.str.format('{incunit}', incunit=$rec.incunit).ljust(10)
$incval = $lib.str.format('{incval}', incval=$rec.incval).ljust(6)
$lib.print(' {incunit} {incval} {reqdict}',
incunit=$incunit, incval=$incval, reqdict=$rec.reqdict)
}
} else {
$lib.print('entries: <None>')
}
}
''',
},
{
'name': 'cron.enable',
'descr': 'Enable a cron job in the cortex.',
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid cron job iden is accepted.'}),
),
'storm': '''
$iden = $lib.cron.enable($cmdopts.iden)
$lib.print("Enabled cron job: {iden}", iden=$iden)
''',
},
{
'name': 'cron.disable',
'descr': 'Disable a cron job in the cortex.',
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid cron job iden is accepted.'}),
),
'storm': '''
$iden = $lib.cron.disable($cmdopts.iden)
$lib.print("Disabled cron job: {iden}", iden=$iden)
''',
},
{
'name': 'ps.list',
'descr': 'List running tasks in the cortex.',
'cmdargs': (
('--verbose', {'default': False, 'action': 'store_true', 'help': 'Enable verbose output.'}),
),
'storm': '''
$tasks = $lib.ps.list()
for $task in $tasks {
$lib.print("task iden: {iden}", iden=$task.iden)
$lib.print(" name: {name}", name=$task.name)
$lib.print(" user: {user}", user=$task.user)
$lib.print(" status: {status}", status=$task.status)
$lib.print(" start time: {start}", start=$lib.time.format($task.tick, '%Y-%m-%d %H:%M:%S'))
$lib.print(" metadata:")
if $cmdopts.verbose {
$lib.pprint($task.info, prefix=' ')
} else {
$lib.pprint($task.info, prefix=' ', clamp=120)
}
}
$lib.print("{tlen} tasks found.", tlen=$tasks.size())
''',
},
{
'name': 'ps.kill',
'descr': 'Kill a running task/query within the cortex.',
'cmdargs': (
('iden', {'help': 'Any prefix that matches exactly one valid process iden is accepted.'}),
),
'storm': '''
$kild = $lib.ps.kill($cmdopts.iden)
$lib.print("kill status: {kild}", kild=$kild)
''',
},
{
'name': 'wget',
'descr': wgetdescr,
'cmdargs': (
('urls', {'nargs': '*', 'help': 'URLs to download.'}),
('--no-ssl-verify', {'default': False, 'action': 'store_true', 'help': 'Ignore SSL certificate validation errors.'}),
('--timeout', {'default': 300, 'type': 'int', 'help': 'Configure the timeout for the download operation.'}),
('--params', {'default': None, 'help': 'Provide a dict containing url parameters.'}),
('--headers', {
'default': {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36',
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.9',
},
'help': 'Provide a Storm dict containing custom request headers.'}),
('--no-headers', {'default': False, 'action': 'store_true', 'help': 'Do NOT use any default headers.'}),
),
'storm': '''
init {
$count = (0)
$params = $cmdopts.params
$headers = $cmdopts.headers
if $cmdopts.no_headers { $headers = $lib.null }
}
$ssl = (not $cmdopts.no_ssl_verify)
$timeout = $cmdopts.timeout
if $node {
$count = ($count + 1)
if $cmdopts.urls {
$urls = $cmdopts.urls
} else {
if ($node.form() != "inet:url") {
$lib.warn("wget can only take inet:url nodes as input without args.")
$lib.exit()
}
$urls = ($node.value(),)
}
for $url in $urls {
-> { yield $lib.axon.urlfile($url, params=$params, headers=$headers, ssl=$ssl, timeout=$timeout) }
}
}
if ($count = 0) {
for $url in $cmdopts.urls {
yield $lib.axon.urlfile($url, params=$params, headers=$headers, ssl=$ssl, timeout=$timeout)
}
}
''',
},
{
'name': 'nodes.import',
'descr': 'Import a nodes file hosted at a URL into the cortex. Yields created nodes.',
'cmdargs': (
('urls', {'nargs': '*', 'help': 'URL(s) to fetch nodes file from'}),
('--no-ssl-verify', {'default': False, 'action': 'store_true', 'help': 'Ignore SSL certificate validation errors.'}),
),
'storm': '''
init {
$count = (0)
function fetchnodes(url, ssl) {
$resp = $lib.inet.http.get($url, ssl_verify=$ssl)
if ($resp.code = 200) {
$nodes = $lib.list()
for $valu in $resp.msgpack() {
$nodes.append($valu)
}
yield $lib.feed.genr("syn.nodes", $nodes)
} else {
$lib.exit("nodes.import got HTTP error code: {code} for {url}", code=$resp.code, url=$url)
}
}
}
$ssl = (not $cmdopts.no_ssl_verify)
if $node {
$count = ($count + 1)
if ($node.form() != "inet:url") {
$lib.exit("nodes.import can only take inet:url nodes as input without args")
}
$inurls = ($node.value(),)
for $url in $inurls {
-> { yield $fetchnodes($url, $ssl) }
}
}
if ($count = 0) {
for $url in $cmdopts.urls {
for $valu in $fetchnodes($url, $ssl) {
yield $valu
}
}
}
''',
},
{
'name': 'note.add',
'descr': 'Add a new meta:note node and link it to the inbound nodes using an -(about)> edge.',
'cmdargs': (
('text', {'type': 'str', 'help': 'The note text to add to the nodes.'}),
('--type', {'type': 'str', 'help': 'The note type.'}),
('--yield', {'default': False, 'action': 'store_true',
'help': 'Yield the newly created meta:note node.'}),
),
'storm': '''
init {
function addNoteNode(text, type) {
if $type { $type = $lib.cast(meta:note:type:taxonomy, $type) }
[ meta:note=* :text=$text :creator=$lib.user.iden :created=.created :updated=.created ]
if $type {[ :type=$type ]}
return($node)
}
$yield = $cmdopts.yield
$note = $addNoteNode($cmdopts.text, $cmdopts.type)
}
[ <(about)+ { yield $note } ]
if $yield { spin }
if $yield { yield $note }
''',
},
{
'name': 'uptime',
'descr': 'Print the uptime for the Cortex or a connected service.',
'cmdargs': (
('name', {'type': 'str', 'nargs': '?',
'help': 'The name, or iden, of the service (if not provided defaults to the Cortex).'}),
),
'storm': '''
$resp = $lib.cell.uptime(name=$cmdopts.name)
$uptime = $lib.model.type(duration).repr($resp.uptime)
$starttime = $lib.time.format($resp.starttime, "%Y-%m-%d %H:%M:%S")
$lib.print("up {uptime} (since {since})", uptime=$uptime, since=$starttime)
''',
},
)
[docs]
@s_cache.memoize(size=1024)
def queryhash(text):
return hashlib.md5(text.encode(errors='surrogatepass'), usedforsecurity=False).hexdigest()
[docs]
class DmonManager(s_base.Base):
'''
Manager for StormDmon objects.
'''
async def __anit__(self, core):
await s_base.Base.__anit__(self)
self.core = core
self.dmons = {}
self.enabled = False
self.onfini(self._finiAllDmons)
async def _finiAllDmons(self):
await asyncio.gather(*[dmon.fini() for dmon in self.dmons.values()])
async def _stopAllDmons(self):
futs = [dmon.stop() for dmon in self.dmons.values()]
if not futs:
return
logger.debug(f'Stopping [{len(futs)}] Dmons')
await asyncio.gather(*futs)
logger.debug('Stopped Dmons')
[docs]
async def addDmon(self, iden, ddef):
dmon = await StormDmon.anit(self.core, iden, ddef)
self.dmons[iden] = dmon
# TODO Remove default=True when dmon enabled CRUD is implemented
if self.enabled and ddef.get('enabled', True):
await dmon.run()
return dmon
[docs]
def getDmonRunlog(self, iden):
dmon = self.dmons.get(iden)
if dmon is not None:
return dmon._getRunLog()
return ()
[docs]
def getDmon(self, iden):
return self.dmons.get(iden)
[docs]
def getDmonDef(self, iden):
dmon = self.dmons.get(iden)
if dmon:
return dmon.pack()
[docs]
def getDmonDefs(self):
return list(d.pack() for d in self.dmons.values())
[docs]
async def popDmon(self, iden):
'''Remove the dmon and fini it if its exists.'''
dmon = self.dmons.pop(iden, None)
if dmon:
await dmon.fini()
[docs]
async def start(self):
'''
Start all the dmons.
'''
if self.enabled:
return
dmons = list(self.dmons.values())
if not dmons:
self.enabled = True
return
logger.debug('Starting Dmons')
for dmon in dmons:
await dmon.run()
self.enabled = True
logger.debug('Started Dmons')
[docs]
async def stop(self):
'''
Stop all the dmons.
'''
if not self.enabled:
return
await self._stopAllDmons()
self.enabled = False
[docs]
class StormDmon(s_base.Base):
'''
A background storm runtime which is restarted by the cortex.
'''
async def __anit__(self, core, iden, ddef):
await s_base.Base.__anit__(self)
self.core = core
self.iden = iden
self.ddef = ddef
self.task = None
self.enabled = ddef.get('enabled')
self.user = core.auth.user(ddef.get('user'))
self.count = 0
self.status = 'initialized'
self.err_evnt = asyncio.Event()
self.runlog = collections.deque((), 2000)
self.onfini(self.stop)
[docs]
async def stop(self):
logger.debug(f'Stopping Dmon {self.iden}', extra={'synapse': {'iden': self.iden}})
if self.task is not None:
self.task.cancel()
self.task = None
logger.debug(f'Stopped Dmon {self.iden}', extra={'synapse': {'iden': self.iden}})
[docs]
async def run(self):
if self.task: # pragma: no cover
raise s_exc.SynErr(mesg=f'Dmon - {self.iden} - has a current task and cannot start a new one.',
iden=self.iden)
self.task = self.schedCoro(self.dmonloop())
[docs]
async def bump(self):
await self.stop()
await self.run()
[docs]
def pack(self):
retn = dict(self.ddef)
retn['count'] = self.count
retn['status'] = self.status
retn['err'] = self.err_evnt.is_set()
return retn
def _runLogAdd(self, mesg):
self.runlog.append((s_common.now(), mesg))
def _getRunLog(self):
return list(self.runlog)
[docs]
async def dmonloop(self):
logger.debug(f'Starting Dmon {self.iden}', extra={'synapse': {'iden': self.iden}})
s_scope.set('user', self.user)
s_scope.set('storm:dmon', self.iden)
text = self.ddef.get('storm')
opts = self.ddef.get('stormopts', {})
vars = await s_stormtypes.toprim(opts.get('vars', {}), use_list=True)
vars.setdefault('auto', {'iden': self.iden, 'type': 'dmon'})
opts['vars'] = vars
viewiden = opts.get('view')
info = {'iden': self.iden, 'name': self.ddef.get('name', 'storm dmon'), 'view': viewiden}
await self.core.boss.promote('storm:dmon', user=self.user, info=info)
def dmonPrint(evnt):
self._runLogAdd(evnt)
mesg = evnt[1].get('mesg', '')
logger.info(f'Dmon - {self.iden} - {mesg}', extra={'synapse': {'iden': self.iden}})
def dmonWarn(evnt):
self._runLogAdd(evnt)
mesg = evnt[1].get('mesg', '')
logger.warning(f'Dmon - {self.iden} - {mesg}', extra={'synapse': {'iden': self.iden}})
while not self.isfini:
if self.user.info.get('locked'):
self.status = 'fatal error: user locked'
logger.warning(f'Dmon user is locked. Stopping Dmon {self.iden}.',
extra={'synapse': {'iden': self.iden}})
return
view = self.core.getView(viewiden, user=self.user)
if view is None:
self.status = 'fatal error: invalid view'
logger.warning(f'Dmon View is invalid. Stopping Dmon {self.iden}.',
extra={'synapse': {'iden': self.iden}})
return
try:
self.status = 'running'
async with await self.core.snap(user=self.user, view=view) as snap:
snap.cachebuids = False
snap.on('warn', dmonWarn)
snap.on('print', dmonPrint)
self.err_evnt.clear()
async for nodepath in snap.storm(text, opts=opts, user=self.user):
# all storm tasks yield often to prevent latency
self.count += 1
await asyncio.sleep(0)
logger.warning(f'Dmon query exited: {self.iden}', extra={'synapse': {'iden': self.iden}})
self.status = 'sleeping'
except s_stormctrl.StormExit:
self.status = 'sleeping'
except asyncio.CancelledError:
self.status = 'stopped'
raise
except Exception as e:
self._runLogAdd(('err', s_common.excinfo(e)))
logger.exception(f'Dmon error ({self.iden})', extra={'synapse': {'iden': self.iden}})
self.status = f'error: {e}'
self.err_evnt.set()
# bottom of the loop... wait it out
await self.waitfini(timeout=1)
[docs]
class Runtime(s_base.Base):
'''
A Runtime represents the instance of a running query.
The runtime should maintain a firm API boundary using the snap.
Parallel query execution requires that the snap be treated as an
opaque object which is called through, but not dereferenced.
'''
_admin_reason = s_auth._allowedReason(True, isadmin=True)
async def __anit__(self, query, snap, opts=None, user=None, root=None):
await s_base.Base.__anit__(self)
if opts is None:
opts = {}
self.vars = {}
self.ctors = {
'lib': s_stormtypes.LibBase,
}
self.opts = opts
self.snap = snap
self.user = user
self.debug = opts.get('debug', False)
self.asroot = False
self.root = root
self.funcscope = False
self.query = query
self.spawn_log_conf = await self.snap.core._getSpawnLogConf()
self.readonly = opts.get('readonly', False) # EXPERIMENTAL: Make it safe to run untrusted queries
self.model = snap.core.getDataModel()
self.task = asyncio.current_task()
self.emitq = None
self.inputs = [] # [synapse.lib.node.Node(), ...]
self.iden = s_common.guid()
varz = self.opts.get('vars')
if varz is not None:
for valu in varz.values():
if isinstance(valu, s_base.Base):
valu.incref()
self.vars.update(varz)
self._initRuntVars(query)
self.proxies = {}
self.onfini(self._onRuntFini)
def _initRuntVars(self, query):
# declare path builtins as non-runtsafe
self.runtvars = {
'node': False,
'path': False,
}
# inherit runtsafe vars from our root
if self.root is not None:
self.runtvars.update(self.root.runtvars)
self.runtvars.update({k: True for k in self.root.getScopeVars().keys()})
# all vars/ctors are de-facto runtsafe
self.runtvars.update({k: True for k in self.vars.keys()})
self.runtvars.update({k: True for k in self.ctors.keys()})
self._loadRuntVars(query)
[docs]
def getScopeVars(self):
'''
Return a dict of all the vars within this and all parent scopes.
'''
varz = {}
if self.root:
varz.update(self.root.getScopeVars())
varz.update(self.vars)
return varz
[docs]
async def emitter(self):
self.emitq = asyncio.Queue(maxsize=1)
async def fill():
try:
async for item in self.execute():
await asyncio.sleep(0)
await self.emitq.put((False, None))
except asyncio.CancelledError: # pragma: no cover
raise
except s_stormctrl.StormStop:
await self.emitq.put((False, None))
except Exception as e:
await self.emitq.put((False, e))
self.schedCoro(fill())
async def genr():
async with self:
while not self.isfini:
ok, item = await self.emitq.get()
if ok:
yield item
self.emitevt.set()
continue
if not ok and item is None:
return
raise item
self.emitevt = asyncio.Event()
return genr()
[docs]
async def emit(self, item):
if self.emitq is None:
mesg = 'Cannot emit from outside of an emitter function'
raise s_exc.StormRuntimeError(mesg=mesg)
self.emitevt.clear()
await self.emitq.put((True, item))
await self.emitevt.wait()
async def _onRuntFini(self):
# fini() any Base objects constructed by this runtime
for valu in list(self.vars.values()):
if isinstance(valu, s_base.Base):
await valu.fini()
[docs]
async def reqGateKeys(self, gatekeys):
if self.asroot:
return
await self.snap.core.reqGateKeys(gatekeys)
[docs]
async def reqUserCanReadLayer(self, layriden):
if self.asroot:
return
for view in self.snap.core.viewsbylayer.get(layriden, ()):
if self.user.allowed(('view', 'read'), gateiden=view.iden):
return
# check the old way too...
if self.user.allowed(('layer', 'read'), gateiden=layriden):
return
mesg = f'User ({self.user.name}) can not read layer.'
raise s_exc.AuthDeny(mesg=mesg, user=self.user.iden, username=self.user.name)
[docs]
async def dyncall(self, iden, todo, gatekeys=()):
# bypass all perms checks if we are running asroot
if self.asroot:
gatekeys = ()
return await self.snap.core.dyncall(iden, todo, gatekeys=gatekeys)
[docs]
async def dyniter(self, iden, todo, gatekeys=()):
# bypass all perms checks if we are running asroot
if self.asroot:
gatekeys = ()
async for item in self.snap.core.dyniter(iden, todo, gatekeys=gatekeys):
yield item
[docs]
async def getStormQuery(self, text):
return await self.snap.core.getStormQuery(text)
[docs]
async def coreDynCall(self, todo, perm=None):
gatekeys = ()
if perm is not None:
gatekeys = ((self.user.iden, perm, None),)
# bypass all perms checks if we are running asroot
if self.asroot:
gatekeys = ()
return await self.snap.core.dyncall('cortex', todo, gatekeys=gatekeys)
[docs]
async def getTeleProxy(self, url, **opts):
flat = tuple(sorted(opts.items()))
prox = self.proxies.get((url, flat))
if prox is not None:
return prox
prox = await s_telepath.openurl(url, **opts)
self.proxies[(url, flat)] = prox
self.snap.onfini(prox.fini)
return prox
[docs]
def isRuntVar(self, name):
return bool(self.runtvars.get(name))
[docs]
async def printf(self, mesg):
return await self.snap.printf(mesg)
[docs]
async def warn(self, mesg, **info):
return await self.snap.warn(mesg, **info)
[docs]
async def warnonce(self, mesg, **info):
return await self.snap.warnonce(mesg, **info)
[docs]
def cancel(self):
self.task.cancel()
[docs]
def initPath(self, node):
return s_node.Path(dict(self.vars), [node])
[docs]
def getOpt(self, name, defval=None):
return self.opts.get(name, defval)
[docs]
def setOpt(self, name, valu):
self.opts[name] = valu
[docs]
def getVar(self, name, defv=None):
item = self.vars.get(name, s_common.novalu)
if item is not s_common.novalu:
return item
ctor = self.ctors.get(name)
if ctor is not None:
item = ctor(self)
self.vars[name] = item
return item
if self.root is not None:
valu = self.root.getVar(name, defv=s_common.novalu)
if valu is not s_common.novalu:
return valu
return defv
def _isRootScope(self, name):
if self.root is None:
return False
if not self.funcscope:
return True
if name in self.root.vars:
return True
return self.root._isRootScope(name)
async def _setVar(self, name, valu):
oldv = self.vars.get(name, s_common.novalu)
if oldv is valu:
return
if isinstance(oldv, s_base.Base):
await oldv.fini()
if isinstance(valu, s_base.Base):
valu.incref()
self.vars[name] = valu
[docs]
async def setVar(self, name, valu):
if name in self.ctors or name in self.vars:
await self._setVar(name, valu)
return
if self._isRootScope(name):
return await self.root.setVar(name, valu)
await self._setVar(name, valu)
return
[docs]
async def popVar(self, name):
if self._isRootScope(name):
return self.root.popVar(name)
oldv = self.vars.pop(name, s_common.novalu)
if isinstance(oldv, s_base.Base):
await oldv.fini()
return oldv
[docs]
def layerConfirm(self, perms):
if self.asroot:
return
iden = self.snap.wlyr.iden
return self.user.confirm(perms, gateiden=iden)
[docs]
def isAdmin(self, gateiden=None):
if self.asroot:
return True
return self.user.isAdmin(gateiden=gateiden)
[docs]
def reqAdmin(self, gateiden=None, mesg=None):
if not self.asroot:
self.user.reqAdmin(gateiden=gateiden, mesg=mesg)
[docs]
def confirm(self, perms, gateiden=None, default=None):
'''
Raise AuthDeny if the user doesn't have the permission.
Notes:
An elevated runtime with asroot=True will always return True.
Args:
perms (tuple): The permission tuple.
gateiden (str): The gateiden.
default (bool): The default value.
Returns:
True: If the permission is allowed.
Raises:
AuthDeny: If the user does not have the permission.
'''
if self.asroot:
return
if default is None:
default = False
permdef = self.snap.core.getPermDef(perms)
if permdef:
default = permdef.get('default', False)
return self.user.confirm(perms, gateiden=gateiden, default=default)
[docs]
def allowed(self, perms, gateiden=None, default=None):
if self.asroot:
return True
if default is None:
default = False
permdef = self.snap.core.getPermDef(perms)
if permdef:
default = permdef.get('default', False)
return self.user.allowed(perms, gateiden=gateiden, default=default)
[docs]
def allowedReason(self, perms, gateiden=None, default=None):
if self.asroot:
return self._admin_reason
return self.snap.core._propAllowedReason(self.user, perms, gateiden=gateiden, default=default)
[docs]
def confirmPropSet(self, prop, layriden=None):
if self.asroot:
return
if layriden is None:
layriden = self.snap.wlyr.iden
return self.snap.core.confirmPropSet(self.user, prop, layriden=layriden)
[docs]
def confirmPropDel(self, prop, layriden=None):
if self.asroot:
return
if layriden is None:
layriden = self.snap.wlyr.iden
return self.snap.core.confirmPropDel(self.user, prop, layriden=layriden)
[docs]
def confirmEasyPerm(self, item, perm, mesg=None):
if not self.asroot:
self.snap.core._reqEasyPerm(item, self.user, perm, mesg=mesg)
[docs]
def allowedEasyPerm(self, item, perm):
if self.asroot:
return True
return self.snap.core._hasEasyPerm(item, self.user, perm)
def _loadRuntVars(self, query):
# do a quick pass to determine which vars are per-node.
for oper in query.kids:
for name, isrunt in oper.getRuntVars(self):
# once runtsafe, always runtsafe
if self.runtvars.get(name):
continue
self.runtvars[name] = isrunt
[docs]
def setGraph(self, gdef):
if self.root is not None:
self.root.setGraph(gdef)
else:
self.opts['graph'] = gdef
[docs]
def getGraph(self):
if self.root is not None:
return self.root.getGraph()
return self.opts.get('graph')
[docs]
async def execute(self, genr=None):
try:
async with contextlib.aclosing(self.query.iterNodePaths(self, genr=genr)) as nodegenr:
nodegenr, empty = await s_ast.pullone(nodegenr)
if empty:
return
rules = self.opts.get('graph')
if rules not in (False, None):
if rules is True:
rules = {'degrees': None, 'refs': True}
elif isinstance(rules, str):
rules = await self.snap.core.getStormGraph(rules)
subgraph = s_ast.SubGraph(rules)
nodegenr = subgraph.run(self, nodegenr)
async for item in nodegenr:
self.tick()
yield item
except RecursionError:
mesg = 'Maximum Storm pipeline depth exceeded.'
raise s_exc.RecursionLimitHit(mesg=mesg, query=self.query.text) from None
async def _snapFromOpts(self, opts):
snap = self.snap
if opts is not None:
viewiden = opts.get('view')
if viewiden is not None:
view = snap.core.views.get(viewiden)
if view is None:
raise s_exc.NoSuchView(mesg=f'No such view iden={viewiden}', iden=viewiden)
self.confirm(('view', 'read'), gateiden=viewiden)
snap = await view.snap(self.user)
return snap
[docs]
@contextlib.asynccontextmanager
async def getSubRuntime(self, query, opts=None):
'''
Yield a runtime with shared scope that will populate changes upward.
'''
async with await self.initSubRuntime(query, opts=opts) as runt:
yield runt
[docs]
async def initSubRuntime(self, query, opts=None):
'''
Construct and return sub-runtime with a shared scope.
( caller must fini )
'''
snap = await self._snapFromOpts(opts)
runt = await Runtime.anit(query, snap, user=self.user, opts=opts, root=self)
if self.debug:
runt.debug = True
runt.asroot = self.asroot
runt.readonly = self.readonly
return runt
[docs]
@contextlib.asynccontextmanager
async def getCmdRuntime(self, query, opts=None):
'''
Yield a runtime with proper scoping for use in executing a pure storm command.
'''
async with await Runtime.anit(query, self.snap, user=self.user, opts=opts) as runt:
if self.debug:
runt.debug = True
runt.asroot = self.asroot
runt.readonly = self.readonly
yield runt
[docs]
async def getModRuntime(self, query, opts=None):
'''
Construct a non-context managed runtime for use in module imports.
'''
runt = await Runtime.anit(query, self.snap, user=self.user, opts=opts)
if self.debug:
runt.debug = True
runt.asroot = self.asroot
runt.readonly = self.readonly
return runt
[docs]
async def storm(self, text, opts=None, genr=None):
'''
Execute a storm runtime which inherits from this storm runtime.
'''
if opts is None:
opts = {}
query = await self.snap.core.getStormQuery(text)
async with self.getSubRuntime(query, opts=opts) as runt:
async for item in runt.execute(genr=genr):
await asyncio.sleep(0)
yield item
[docs]
async def getOneNode(self, propname, valu, filt=None, cmpr='='):
'''
Return exactly 1 node by <prop> <cmpr> <valu>
'''
opts = {'vars': {'propname': propname, 'valu': valu}}
nodes = []
try:
async for node in self.snap.nodesByPropValu(propname, cmpr, valu):
await asyncio.sleep(0)
if filt is not None and not await filt(node):
continue
if len(nodes) == 1:
mesg = 'Ambiguous value for single node lookup: {propname}^={valu}'
raise s_exc.StormRuntimeError(mesg=mesg)
nodes.append(node)
if len(nodes) == 1:
return nodes[0]
except s_exc.BadTypeValu:
return None
[docs]
class Parser:
def __init__(self, prog=None, descr=None, root=None):
if root is None:
root = self
self.prog = prog
self.descr = descr
self.exc = None
self.root = root
self.exited = False
self.mesgs = []
self.optargs = {}
self.posargs = []
self.allargs = []
self.inputs = None
self.reqopts = []
self.add_argument('--help', '-h', action='store_true', default=False, help='Display the command usage.')
[docs]
def add_argument(self, *names, **opts):
assert len(names)
argtype = opts.get('type')
if argtype is not None and argtype not in s_datamodel.Model().types:
mesg = f'Argument type "{argtype}" is not a valid model type name'
raise s_exc.BadArg(mesg=mesg, argtype=str(argtype))
choices = opts.get('choices')
if choices is not None and opts.get('action') in ('store_true', 'store_false'):
mesg = f'Argument choices are not supported when action is store_true or store_false'
raise s_exc.BadArg(mesg=mesg, argtype=str(argtype))
dest = self._get_dest(names)
opts.setdefault('dest', dest)
self.allargs.append((names, opts))
if opts.get('required'):
self.reqopts.append((names, opts))
for name in names:
self._add_arg(name, opts)
def _get_dest(self, names):
names = [n.strip('-').replace('-', '_') for n in names]
names = list(sorted(names, key=lambda x: len(x)))
return names[-1]
def _printf(self, *msgs):
self.mesgs.extend(msgs)
def _add_arg(self, name, opts):
if name.startswith('-'):
self.optargs[name] = opts
return
self.posargs.append((name, opts))
def _is_opt(self, valu):
if not isinstance(valu, str):
return False
return self.optargs.get(valu) is not None
[docs]
def parse_args(self, argv):
posargs = []
todo = collections.deque(argv)
opts = {}
while todo:
item = todo.popleft()
# non-string args must be positional or nargs to an optarg
if not isinstance(item, str):
posargs.append(item)
continue
argdef = self.optargs.get(item)
if argdef is None:
posargs.append(item)
continue
dest = argdef.get('dest')
oact = argdef.get('action', 'store')
if oact == 'store_true':
opts[dest] = True
continue
if oact == 'store_false':
opts[dest] = False
continue
if oact == 'append':
vals = opts.get(dest)
if vals is None:
vals = opts[dest] = []
fakeopts = {}
if not self._get_store(item, argdef, todo, fakeopts):
return
vals.append(fakeopts.get(dest))
continue
assert oact == 'store'
if not self._get_store(item, argdef, todo, opts):
return
# check for help before processing other args
if opts.pop('help', None):
mesg = None
if opts or posargs:
mesg = f'Extra arguments and flags are not supported with the help flag: {self.prog} {" ".join(argv)}'
self.help(mesg)
return
# process positional arguments
todo = collections.deque(posargs)
for name, argdef in self.posargs:
if not self._get_store(name, argdef, todo, opts):
return
if todo:
delta = len(posargs) - len(todo)
mesg = f'Expected {delta} positional arguments. Got {len(posargs)}: {posargs!r}'
self.help(mesg)
return
for _, argdef in self.allargs:
if 'default' in argdef:
opts.setdefault(argdef['dest'], argdef['default'])
for names, argdef in self.reqopts:
dest = argdef.get('dest')
if dest not in opts:
namestr = ','.join(names)
mesg = f'Missing a required option: {namestr}'
self.help(mesg)
return
retn = argparse.Namespace()
[setattr(retn, name, valu) for (name, valu) in opts.items()]
return retn
def _get_store(self, name, argdef, todo, opts):
dest = argdef.get('dest')
nargs = argdef.get('nargs')
argtype = argdef.get('type')
choices = argdef.get('choices')
vals = []
if nargs is None:
if not todo or self._is_opt(todo[0]):
if name.startswith('-'):
mesg = f'An argument is required for {name}.'
else:
mesg = f'The argument <{name}> is required.'
return self.help(mesg)
valu = todo.popleft()
if argtype is not None:
try:
valu = s_datamodel.Model().type(argtype).norm(valu)[0]
except Exception:
mesg = f'Invalid value for type ({argtype}): {valu}'
return self.help(mesg=mesg)
if choices is not None and valu not in choices:
marg = name if name.startswith('-') else f'<{name}>'
cstr = ', '.join(str(c) for c in choices)
mesg = f'Invalid choice for argument {marg} (choose from: {cstr}): {valu}'
return self.help(mesg=mesg)
opts[dest] = valu
return True
if nargs == '?':
opts.setdefault(dest, argdef.get('default'))
if todo and not self._is_opt(todo[0]):
valu = todo.popleft()
if argtype is not None:
try:
valu = s_datamodel.Model().type(argtype).norm(valu)[0]
except Exception:
mesg = f'Invalid value for type ({argtype}): {valu}'
return self.help(mesg=mesg)
if choices is not None and valu not in choices:
marg = name if name.startswith('-') else f'<{name}>'
cstr = ', '.join(str(c) for c in choices)
mesg = f'Invalid choice for argument {marg} (choose from: {cstr}): {valu}'
return self.help(mesg=mesg)
opts[dest] = valu
return True
if nargs in ('*', '+'):
while todo and not self._is_opt(todo[0]):
valu = todo.popleft()
if argtype is not None:
try:
valu = s_datamodel.Model().type(argtype).norm(valu)[0]
except Exception:
mesg = f'Invalid value for type ({argtype}): {valu}'
return self.help(mesg=mesg)
if choices is not None and valu not in choices:
marg = name if name.startswith('-') else f'<{name}>'
cstr = ', '.join(str(c) for c in choices)
mesg = f'Invalid choice for argument {marg} (choose from: {cstr}): {valu}'
return self.help(mesg=mesg)
vals.append(valu)
if nargs == '+' and len(vals) == 0:
mesg = f'At least one argument is required for {name}.'
return self.help(mesg)
opts[dest] = vals
return True
for _ in range(nargs):
if not todo or self._is_opt(todo[0]):
mesg = f'{nargs} arguments are required for {name}.'
return self.help(mesg)
valu = todo.popleft()
if argtype is not None:
try:
valu = s_datamodel.Model().type(argtype).norm(valu)[0]
except Exception:
mesg = f'Invalid value for type ({argtype}): {valu}'
return self.help(mesg=mesg)
if choices is not None and valu not in choices:
marg = name if name.startswith('-') else f'<{name}>'
cstr = ', '.join(str(c) for c in choices)
mesg = f'Invalid choice for argument {marg} (choose from: {cstr}): {valu}'
return self.help(mesg=mesg)
vals.append(valu)
opts[dest] = vals
return True
[docs]
def help(self, mesg=None):
posnames = [f'<{name}>' for (name, argdef) in self.posargs]
posargs = ' '.join(posnames)
if self.descr is not None:
self._printf('')
self._printf(self.descr)
self._printf('')
self._printf(f'Usage: {self.prog} [options] {posargs}')
options = [x for x in self.allargs if x[0][0].startswith('-')]
self._printf('')
self._printf('Options:')
self._printf('')
for names, argdef in options:
self._print_optarg(names, argdef)
if self.posargs:
self._printf('')
self._printf('Arguments:')
self._printf('')
for name, argdef in self.posargs:
self._print_posarg(name, argdef)
if self.inputs:
self._printf('')
self._printf('Inputs:')
self._printf('')
formsize = max([len(idef['form']) for idef in self.inputs])
for idef in self.inputs:
form = idef.get('form').ljust(formsize)
text = f' {form}'
desc = idef.get('help')
if desc:
text += f' - {desc}'
self._printf(text)
if mesg is not None:
self.exc = s_exc.BadArg(mesg=mesg)
self.exited = True
return False
def _wrap_text(self, text, width):
lines, curline, curlen = [], [], 0
for word in text.split():
if curlen + len(word) + bool(curline) > width:
lines.append(' '.join(curline))
curline, curlen = [word], len(word)
else:
curline.append(word)
curlen += len(word) + bool(curline)
if curline:
lines.append(' '.join(curline))
return lines
def _print_optarg(self, names, argdef):
dest = self._get_dest_str(argdef)
oact = argdef.get('action', 'store')
if oact in ('store_true', 'store_false'):
base = f' {names[0]}'
else:
base = f' {names[0]} {dest}'
defval = argdef.get('default', s_common.novalu)
choices = argdef.get('choices')
helpstr = argdef.get('help', 'No help available.')
if defval is not s_common.novalu and oact not in ('store_true', 'store_false'):
if isinstance(defval, (tuple, list, dict)):
defval_ls = pprint.pformat(defval, width=120).split('\n')
defval = '\n'.join(ln.strip() for ln in defval_ls)
if choices is None:
if (lambda tst: '\n' in tst if isinstance(tst, str) else False)(defval):
helpstr = f'{helpstr} (default: \n{defval})'
else:
helpstr = f'{helpstr} (default: {defval})'
else:
cstr = ', '.join(str(c) for c in choices)
helpstr = f'{helpstr} (default: {defval}, choices: {cstr})'
elif choices is not None:
cstr = ', '.join(str(c) for c in choices)
helpstr = f'{helpstr} (choices: {cstr})'
helplst = helpstr.split('\n')
if helplst and not helplst[0].strip():
helplst = helplst[1:]
min_space = min((len(ln) - len(ln.lstrip()) for ln in helplst if ln.strip()), default=0)
base_w = 32
wrap_w = 120 - base_w
first = helplst[0][min_space:]
wrap_first = self._wrap_text(first, wrap_w)
self._printf(f'{base:<{base_w-2}}: {wrap_first[0]}')
for ln in wrap_first[1:]: self._printf(f'{"":<{base_w}}{ln}')
for ln in helplst[1:]:
lead_s = len(ln) - len(ln.lstrip())
rel_ind = lead_s - min_space
ind = ' ' * (base_w + rel_ind)
wrapped = self._wrap_text(ln.lstrip(), wrap_w - rel_ind)
for wl in wrapped:
self._printf(f'{ind}{wl}')
def _print_posarg(self, name, argdef):
dest = self._get_dest_str(argdef)
helpstr = argdef.get('help', 'No help available')
choices = argdef.get('choices')
if choices is not None:
cstr = ', '.join(str(c) for c in choices)
helpstr = f'{helpstr} (choices: {cstr})'
base = f' {dest}'.ljust(30)
self._printf(f'{base}: {helpstr}')
def _get_dest_str(self, argdef):
dest = argdef.get('dest')
nargs = argdef.get('nargs')
if nargs == '*':
return f'[<{dest}> ...]'
if nargs == '+':
return f'<{dest}> [<{dest}> ...]'
if nargs == '?':
return f'[{dest}]'
return f'<{dest}>'
[docs]
class Cmd:
'''
A one line description of the command.
Command usage details and long form description.
Example:
cmd --help
Notes:
Python Cmd implementers may override the ``forms`` attribute with a dictionary to provide information
about Synapse forms which are possible input and output nodes that a Cmd may recognize. A list of
(key, form) tuples may also be added to provide information about forms which may have additional
nodedata added to them by the Cmd.
Example:
::
{
'input': (
'inet:ipv4',
'tel:mob:telem',
),
'output': (
'geo:place',
),
'nodedata': (
('foodata', 'inet:http:request'),
('bardata', 'inet:ipv4'),
),
}
'''
name = 'cmd'
pkgname = ''
svciden = ''
asroot = False
readonly = False
forms = {} # type: ignore
def __init__(self, runt, runtsafe):
self.opts = None
self.argv = None
self.runt = runt
self.runtsafe = runtsafe
self.pars = self.getArgParser()
self.pars.printf = runt.snap.printf
[docs]
def isReadOnly(self):
return self.readonly
[docs]
@classmethod
def getCmdBrief(cls):
return cls.__doc__.strip().split('\n')[0]
[docs]
def getName(self):
return self.name
[docs]
def getDescr(self):
return self.__class__.__doc__
[docs]
def getArgParser(self):
return Parser(prog=self.getName(), descr=self.getDescr())
[docs]
async def setArgv(self, argv):
self.argv = argv
try:
self.opts = self.pars.parse_args(self.argv)
except s_exc.BadSyntax: # pragma: no cover
pass
for line in self.pars.mesgs:
await self.runt.snap.printf(line)
if self.pars.exc is not None:
raise self.pars.exc
return not self.pars.exited
[docs]
async def execStormCmd(self, runt, genr): # pragma: no cover
''' Abstract base method '''
raise s_exc.NoSuchImpl('Subclass must implement execStormCmd')
for item in genr:
yield item
[docs]
@classmethod
def getStorNode(cls, form):
ndef = (form.name, form.type.norm(cls.name)[0])
buid = s_common.buid(ndef)
props = {
'doc': cls.getCmdBrief()
}
inpt = cls.forms.get('input')
outp = cls.forms.get('output')
nodedata = cls.forms.get('nodedata')
if inpt:
props['input'] = tuple(inpt)
if outp:
props['output'] = tuple(outp)
if nodedata:
props['nodedata'] = tuple(nodedata)
if cls.svciden:
props['svciden'] = cls.svciden
if cls.pkgname:
props['package'] = cls.pkgname
pnorms = {}
for prop, valu in props.items():
formprop = form.props.get(prop)
if formprop is not None and valu is not None:
pnorms[prop] = formprop.type.norm(valu)[0]
return (buid, {
'ndef': ndef,
'props': pnorms,
})
[docs]
class PureCmd(Cmd):
# pure commands are all "readonly" safe because their perms are enforced
# by the underlying runtime executing storm operations that are readonly
# or not
readonly = True
def __init__(self, cdef, runt, runtsafe):
self.cdef = cdef
Cmd.__init__(self, runt, runtsafe)
self.asroot = cdef.get('asroot', False)
[docs]
def getDescr(self):
return self.cdef.get('descr', 'no documentation provided')
[docs]
def getName(self):
return self.cdef.get('name')
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
for name, opts in self.cdef.get('cmdargs', ()):
pars.add_argument(name, **opts)
inputs = self.cdef.get('cmdinputs')
if inputs:
pars.set_inputs(inputs)
return pars
[docs]
async def execStormCmd(self, runt, genr):
name = self.getName()
perm = ('storm', 'asroot', 'cmd') + tuple(name.split('.'))
asroot = runt.allowed(perm)
if self.asroot and not asroot:
mesg = f'Command ({name}) elevates privileges. You need perm: storm.asroot.cmd.{name}'
raise s_exc.AuthDeny(mesg=mesg, user=runt.user.iden, username=runt.user.name)
# if a command requires perms, check em!
# ( used to create more intuitive perm boundaries )
perms = self.cdef.get('perms')
if perms is not None:
allowed = False
for perm in perms:
if runt.allowed(perm):
allowed = True
break
if not allowed:
permtext = ' or '.join(('.'.join(p) for p in perms))
mesg = f'Command ({name}) requires permission: {permtext}'
raise s_exc.AuthDeny(mesg=mesg, user=runt.user.iden, username=runt.user.name)
text = self.cdef.get('storm')
query = await runt.snap.core.getStormQuery(text)
cmdopts = s_stormtypes.CmdOpts(self)
opts = {
'vars': {
'cmdopts': cmdopts,
'cmdconf': self.cdef.get('cmdconf', {}),
}
}
if self.runtsafe:
data = {'pathvars': {}}
async def genx():
async for xnode, xpath in genr:
data['pathvars'] = xpath.vars.copy()
xpath.initframe(initvars={'cmdopts': cmdopts})
yield xnode, xpath
async with runt.getCmdRuntime(query, opts=opts) as subr:
subr.asroot = asroot
async for node, path in subr.execute(genr=genx()):
path.finiframe()
path.vars.update(data['pathvars'])
yield node, path
else:
async with runt.getCmdRuntime(query, opts=opts) as subr:
subr.asroot = asroot
async for node, path in genr:
pathvars = path.vars.copy()
async def genx():
path.initframe(initvars={'cmdopts': cmdopts})
yield node, path
async for xnode, xpath in subr.execute(genr=genx()):
xpath.finiframe()
xpath.vars.update(pathvars)
yield xnode, xpath
[docs]
class DivertCmd(Cmd):
'''
Either consume a generator or yield it's results based on a conditional.
NOTE: This command is purpose built to facilitate the --yield convention
common to storm commands.
NOTE: The genr argument must not be a function that returns, else it will
be invoked for each inbound node.
Example:
divert $cmdopts.yield $fooBarBaz()
'''
name = 'divert'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('cond', help='The conditional value for the yield option.')
pars.add_argument('genr', help='The generator function value that yields nodes.')
pars.add_argument('--size', default=None, help='The max number of times to iterate the generator.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
async def run(item):
if not isinstance(self.opts.genr, types.AsyncGeneratorType):
raise s_exc.BadArg(mesg='The genr argument must yield nodes')
size = await s_stormtypes.toint(self.opts.size, noneok=True)
doyield = await s_stormtypes.tobool(self.opts.cond)
try:
count = 0
if doyield:
async for genritem in self.opts.genr:
yield genritem
count += 1
if size is not None and count >= size:
break
else:
async for genritem in self.opts.genr:
await asyncio.sleep(0)
count += 1
if size is not None and count >= size:
break
if item is not None:
yield item
finally:
await self.opts.genr.aclose()
empty = True
async for item in genr:
empty = False
async for runitem in run(item):
yield runitem
if empty:
async for runitem in run(None):
yield runitem
[docs]
class BatchCmd(Cmd):
'''
Run a query with batched sets of nodes.
The batched query will have the set of inbound nodes available in the
variable $nodes.
This command also takes a conditional as an argument. If the conditional
evaluates to true, the nodes returned by the batched query will be yielded,
if it evaluates to false, the inbound nodes will be yielded after executing the
batched query.
NOTE: This command is intended to facilitate use cases such as queries to external
APIs with aggregate node values to reduce quota consumption. As this command
interrupts the node stream, it should be used carefully to avoid unintended
slowdowns in the pipeline.
Example:
// Execute a query with batches of 5 nodes, then yield the inbound nodes
batch $lib.false --size 5 { $lib.print($nodes) }
'''
name = 'batch'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('cond', help='The conditional value for the yield option.')
pars.add_argument('query', help='The query to execute with batched nodes.')
pars.add_argument('--size', default=10,
help='The number of nodes to collect before running the batched query (max 10000).')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'batch arguments must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
size = await s_stormtypes.toint(self.opts.size)
if size > 10000:
mesg = f'Specified batch size ({size}) is above the maximum (10000).'
raise s_exc.StormRuntimeError(mesg=mesg)
query = await runt.getStormQuery(self.opts.query)
doyield = await s_stormtypes.tobool(self.opts.cond)
async with runt.getSubRuntime(query, opts={'vars': {'nodes': []}}) as subr:
nodeset = []
pathset = []
async for node, path in genr:
nodeset.append(node)
pathset.append(path)
if len(nodeset) >= size:
await subr.setVar('nodes', nodeset)
subp = None
async for subp in subr.execute():
await asyncio.sleep(0)
if doyield:
yield subp
if not doyield:
for item in zip(nodeset, pathset):
await asyncio.sleep(0)
if subp is not None:
item[1].vars.update(subp[1].vars)
yield item
nodeset.clear()
pathset.clear()
if len(nodeset) > 0:
await subr.setVar('nodes', nodeset)
subp = None
async for subp in subr.execute():
await asyncio.sleep(0)
if doyield:
yield subp
if not doyield:
for item in zip(nodeset, pathset):
await asyncio.sleep(0)
if subp is not None:
item[1].vars.update(subp[1].vars)
yield item
[docs]
class HelpCmd(Cmd):
'''
List available information about Storm and brief descriptions of different items.
Notes:
If an item is provided, this can be a string or a function.
Examples:
// Get all available commands, libraries, types, and their brief descriptions.
help
// Only get commands which have "model" in the name.
help model
// Get help about the base Storm library
help $lib
// Get detailed help about a specific library or library function
help --verbose $lib.print
// Get detailed help about a named Storm type
help --verbose str
// Get help about a method from a $node object
<inbound $node> help $node.tags
'''
name = 'help'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('-v', '--verbose', default=False, action='store_true',
help='Display detailed help when available.')
pars.add_argument('item', nargs='?',
help='List information about a subset of commands or a specific item.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
node = None
async for node, path in genr:
await self._runHelp(runt)
yield node, path
if node is None and self.runtsafe:
await self._runHelp(runt)
async def _runHelp(self, runt: Runtime):
item = self.opts.item
if item is not None and \
not isinstance(item, (str, s_node.Node, s_node.Path, s_stormtypes.StormType)) and \
not callable(item):
mesg = f'Item must be a Storm type name, a Storm library, or a Storm command name to search for. Got' \
f' {await s_stormtypes.totype(item, basetypes=True)}'
raise s_exc.BadArg(mesg=mesg)
if isinstance(item, s_stormtypes.Lib):
await self._handleLibHelp(item, runt, verbose=self.opts.verbose)
return
if isinstance(item, s_stormtypes.StormType):
if item._storm_typename in s_stormtypes.registry.known_types:
await self._handleTypeHelp(item._storm_typename, runt, verbose=self.opts.verbose)
return
raise s_exc.BadArg(mesg=f'Unknown storm type encountered: {s_stormtypes.totype(item, basetypes=True)}')
if isinstance(item, s_node.Node):
await self._handleTypeHelp('node', runt, verbose=self.opts.verbose)
return
if isinstance(item, s_node.Path):
await self._handleTypeHelp('node:path', runt, verbose=self.opts.verbose)
return
# Handle $lib.inet.http.get / $str.split / $lib.gen.orgByName
if callable(item):
if hasattr(item, '__func__'):
# https://docs.python.org/3/reference/datamodel.html#instance-methods
await self._handleBoundMethod(item, runt, verbose=self.opts.verbose)
return
if hasattr(item, '_storm_runtime_lib_func'):
await self._handleStormLibMethod(item, runt, verbose=self.opts.verbose)
return
styp = await s_stormtypes.totype(item, basetypes=True)
if styp in ('telepath:proxy:method', 'telepath:proxy:genrmethod'):
raise s_exc.BadArg(mesg='help does not support Telepath proxy methods.')
raise s_exc.BadArg(mesg='help does not currently support runtime defined functions.')
foundtype = False
if item in s_stormtypes.registry.known_types:
foundtype = True
await self._handleTypeHelp(item, runt, verbose=self.opts.verbose)
return await self._handleGenericCommandHelp(item, runt, foundtype=foundtype)
async def _handleGenericCommandHelp(self, item, runt, foundtype=False):
stormcmds = sorted(runt.snap.core.getStormCmds())
if item:
stormcmds = [c for c in stormcmds if item in c[0]]
if not stormcmds:
if not foundtype:
await runt.printf(f'No commands found matching "{item}"')
return
stormpkgs = await runt.snap.core.getStormPkgs()
pkgsvcs = {}
pkgcmds = {}
pkgmap = {}
for pkg in stormpkgs:
svciden = pkg.get('svciden')
pkgname = pkg.get('name')
for cmd in pkg.get('commands', []):
pkgmap[cmd.get('name')] = pkgname
ssvc = runt.snap.core.getStormSvc(svciden)
if ssvc is not None:
pkgsvcs[pkgname] = f'{ssvc.name} ({svciden})'
if stormcmds:
if foundtype:
await runt.printf('')
await runt.printf('*' * 80)
await runt.printf('')
await runt.printf('The following Storm commands are available:')
maxlen = max(len(x[0]) for x in stormcmds)
for name, ctor in stormcmds:
cmdinfo = f'{name:<{maxlen}}: {ctor.getCmdBrief()}'
pkgcmds.setdefault(pkgmap.get(name, 'synapse'), []).append(cmdinfo)
syncmds = pkgcmds.pop('synapse', [])
if syncmds:
await runt.printf(f'package: synapse')
for cmd in syncmds:
await runt.printf(cmd)
await runt.printf('')
for name, cmds in sorted(pkgcmds.items()):
svcinfo = pkgsvcs.get(name)
if svcinfo:
await runt.printf(f'service: {svcinfo}')
await runt.printf(f'package: {name}')
for cmd in cmds:
await runt.printf(cmd)
await runt.printf('')
await runt.printf('For detailed help on any command, use <cmd> --help')
async def _handleLibHelp(self, lib: s_stormtypes.Lib, runt: Runtime, verbose: bool =False):
try:
preamble = self._getChildLibs(lib)
except s_exc.NoSuchName as e:
raise s_exc.BadArg(mesg='Help does not currently support imported Storm modules.') from None
page = s_autodoc.RstHelp()
if hasattr(lib, '_storm_lib_path'):
libsinfo = s_stormtypes.registry.getLibDocs(lib)
s_autodoc.runtimeDocStormTypes(page, libsinfo,
islib=True,
oneline=not verbose,
preamble=preamble,
)
else:
page.addLines(*preamble)
for line in page.lines:
await runt.printf(line)
def _getChildLibs(self, lib: s_stormtypes.Lib):
corelibs = self.runt.snap.core.getStormLib(lib.name)
if corelibs is None:
raise s_exc.NoSuchName(mesg=f'Cannot find lib name [{lib.name}]')
data = []
lines = []
libbase = ('lib',) + lib.name
q = collections.deque()
for child, lnfo in corelibs[1].items():
q.append(((child,), lnfo))
while q:
child, lnfo = q.popleft()
path = libbase + child
_, subs, cnfo = lnfo
ctor = cnfo.get('ctor')
if ctor:
data.append((path, ctor))
for sub, lnfo in subs.items():
_sub = child + (sub,)
q.append((_sub, lnfo))
if not data:
return lines
data = sorted(data, key=lambda x: x[0])
lines.append('The following libraries are available:')
lines.append('')
for path, ctor in data:
name = f'${".".join(path)}'
desc = ctor.__doc__.strip().split('\n')[0]
lines.append(f'{name.ljust(30)}: {desc}')
lines.append('')
return lines
async def _handleTypeHelp(self, styp: str, runt: Runtime, verbose: bool =False):
typeinfo = s_stormtypes.registry.getTypeDocs(styp)
page = s_autodoc.RstHelp()
s_autodoc.runtimeDocStormTypes(page, typeinfo,
islib=False,
oneline=not verbose,
)
for line in page.lines:
await runt.printf(line)
async def _handleBoundMethod(self, func, runt: Runtime, verbose: bool =False):
# Bound methods must be bound to a Lib or Prim object.
# Determine what they are, get those docs exactly, and then render them.
cls = func.__self__
fname = func.__name__
if isinstance(cls, s_stormtypes.Lib):
libsinfo = s_stormtypes.registry.getLibDocs(cls)
for lifo in libsinfo:
nlocs = []
for locl in lifo['locals']:
ltyp = locl.get('type')
if not isinstance(ltyp, dict):
continue
if ltyp.get('_funcname', '') == fname:
nlocs.append(locl)
lifo['locals'] = nlocs
if len(lifo['locals']) == 0:
await runt.warn(f'Unable to find doc for {func}')
page = s_autodoc.RstHelp()
s_autodoc.runtimeDocStormTypes(page, libsinfo,
islib=True,
addheader=False,
oneline=not verbose,
)
for line in page.lines:
await runt.printf(line)
elif isinstance(cls, s_stormtypes.Prim):
typeinfo = s_stormtypes.registry.getTypeDocs(cls._storm_typename)
for lifo in typeinfo:
lifo['locals'] = [loc for loc in lifo['locals'] if loc.get('type', {}).get('_funcname', '') == fname]
if len(lifo['locals']) == 0:
await runt.warn(f'Unable to find doc for {func}')
page = s_autodoc.RstHelp()
s_autodoc.runtimeDocStormTypes(page, typeinfo,
islib=False,
oneline=not verbose,
)
for line in page.lines:
await runt.printf(line)
else: # pragma: no cover
raise s_exc.StormRuntimeError(mesgf=f'Unknown bound method {func}')
async def _handleStormLibMethod(self, func, runt: Runtime, verbose: bool =False):
# Storm library methods must be derived from a library definition.
# Determine the parent lib and get those docs exactly, and then render them.
cls = getattr(func, '_storm_runtime_lib', None)
fname = getattr(func, '_storm_runtime_lib_func', None)
if isinstance(cls, s_stormtypes.Lib):
libsinfo = s_stormtypes.registry.getLibDocs(cls)
for lifo in libsinfo:
nlocs = []
for locl in lifo['locals']:
if locl.get('name') == fname:
nlocs.append(locl)
lifo['locals'] = nlocs
if len(lifo['locals']) == 0:
await runt.warn(f'Unable to find doc for {func}')
page = s_autodoc.RstHelp()
s_autodoc.runtimeDocStormTypes(page, libsinfo,
islib=True,
addheader=False,
oneline=not verbose,
)
for line in page.lines:
await runt.printf(line)
else: # pragma: no cover
raise s_exc.StormRuntimeError(mesgf=f'Unknown runtime lib method {func} {cls} {fname}')
[docs]
class DiffCmd(Cmd):
'''
Generate a list of nodes with changes in the top layer of the current view.
Examples:
// Lift all nodes with any changes
diff
// Lift ou:org nodes that were added in the top layer.
diff --prop ou:org
// Lift inet:ipv4 nodes with the :asn property modified in the top layer.
diff --prop inet:ipv4:asn
// Lift the nodes with the tag #cno.mal.redtree added in the top layer.
diff --tag cno.mal.redtree
// Lift nodes by multiple tags (results are uniqued)
diff --tag cno.mal.redtree rep.vt
'''
name = 'diff'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--tag', default=None, nargs='*',
help='Lift only nodes with the given tag (or tags) in the top layer.')
pars.add_argument('--prop', default=None, help='Lift nodes with changes to the given property the top layer.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if runt.snap.view.parent is None:
mesg = 'You may only generate a diff in a forked view.'
raise s_exc.StormRuntimeError(mesg=mesg)
async for item in genr:
yield item
if self.opts.tag and self.opts.prop:
mesg = 'You may specify --tag *or* --prop but not both.'
raise s_exc.StormRuntimeError(mesg=mesg)
if self.opts.tag:
tagnames = [await s_stormtypes.tostr(tag) for tag in self.opts.tag]
layr = runt.snap.view.layers[0]
async for _, buid, sode in layr.liftByTags(tagnames):
node = await self.runt.snap._joinStorNode(buid, {layr.iden: sode})
if node is not None:
yield node, runt.initPath(node)
return
if self.opts.prop:
propname = await s_stormtypes.tostr(self.opts.prop)
prop = self.runt.snap.core.model.prop(propname)
if prop is None:
mesg = f'The property {propname} does not exist.'
raise s_exc.NoSuchProp(mesg=mesg)
if prop.isform:
liftform = prop.name
liftprop = None
elif prop.isuniv:
liftform = None
liftprop = prop.name
else:
liftform = prop.form.name
liftprop = prop.name
layr = runt.snap.view.layers[0]
async for _, buid, sode in layr.liftByProp(liftform, liftprop):
node = await self.runt.snap._joinStorNode(buid, {layr.iden: sode})
if node is not None:
yield node, runt.initPath(node)
return
async for buid, sode in runt.snap.view.layers[0].getStorNodes():
node = await runt.snap.getNodeByBuid(buid)
if node is not None:
yield node, runt.initPath(node)
[docs]
class CopyToCmd(Cmd):
'''
Copy nodes from the current view into another view.
Examples:
// Copy all nodes tagged with #cno.mal.redtree to the target view.
#cno.mal.redtree | copyto 33c971ac77943da91392dadd0eec0571
'''
name = 'copyto'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--no-data', default=False, action='store_true',
help='Do not copy node data to the destination view.')
pars.add_argument('view', help='The destination view ID to copy the nodes to.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'copyto arguments must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
iden = await s_stormtypes.tostr(self.opts.view)
view = runt.snap.core.getView(iden)
if view is None:
raise s_exc.NoSuchView(mesg=f'No such view: {iden=}', iden=iden)
runt.confirm(('view', 'read'), gateiden=view.iden)
layriden = view.layers[0].iden
async with await view.snap(user=runt.user) as snap:
async for node, path in genr:
runt.confirm(node.form.addperm, gateiden=layriden)
for name in node.props.keys():
runt.confirmPropSet(node.form.props[name], layriden=layriden)
for tag in node.tags.keys():
runt.confirm(('node', 'tag', 'add', *tag.split('.')), gateiden=layriden)
if not self.opts.no_data:
async for name in node.iterDataKeys():
runt.confirm(('node', 'data', 'set', name), gateiden=layriden)
async with snap.getEditor() as editor:
proto = await editor.addNode(node.ndef[0], node.ndef[1])
for name, valu in node.props.items():
prop = node.form.prop(name)
if prop.info.get('ro'):
if name == '.created':
proto.props['.created'] = valu
continue
curv = proto.get(name)
if curv is not None and curv != valu:
valurepr = prop.type.repr(curv)
mesg = f'Cannot overwrite read only property with conflicting ' \
f'value: {node.iden()} {prop.full} = {valurepr}'
await runt.snap.warn(mesg)
continue
await proto.set(name, valu)
for name, valu in node.tags.items():
await proto.addTag(name, valu=valu)
for tagname, tagprops in node.tagprops.items():
for propname, valu in tagprops.items():
await proto.setTagProp(tagname, propname, valu)
if not self.opts.no_data:
async for name, valu in node.iterData():
await proto.setData(name, valu)
verbs = {}
async for (verb, n2iden) in node.iterEdgesN1():
if not verbs.get(verb):
runt.confirm(('node', 'edge', 'add', verb), gateiden=layriden)
verbs[verb] = True
n2node = await snap.getNodeByBuid(s_common.uhex(n2iden))
if n2node is None:
continue
await proto.addEdge(verb, n2iden)
# for the reverse edges, we'll need to make edits to the n1 node
async for (verb, n1iden) in node.iterEdgesN2():
if not verbs.get(verb):
runt.confirm(('node', 'edge', 'add', verb), gateiden=layriden)
verbs[verb] = True
n1proto = await editor.getNodeByBuid(s_common.uhex(n1iden))
if n1proto is not None:
await n1proto.addEdge(verb, s_common.ehex(node.buid))
yield node, path
[docs]
class MergeCmd(Cmd):
'''
Merge edits from the incoming nodes down to the next layer.
NOTE: This command requires the current view to be a fork.
NOTE: The arguments for including/excluding tags can accept tag glob
expressions for specifying tags. For more information on tag glob
expressions, check the Synapse documentation for $node.globtags().
NOTE: If --wipe is specified, and there are nodes that cannot be merged,
they will be skipped (with a warning printed) and removed when
the top layer is replaced. This should occur infrequently, for example,
when a form is locked due to deprecation, a form no longer exists,
or the data at rest fails normalization.
Examples:
// Having tagged a new #cno.mal.redtree subgraph in a forked view...
#cno.mal.redtree | merge --apply
// Print out what the merge command *would* do but dont.
#cno.mal.redtree | merge
// Merge any org nodes with changes in the top layer.
diff | +ou:org | merge --apply
// Merge all tags other than cno.* from ou:org nodes with edits in the
// top layer.
diff | +ou:org | merge --only-tags --exclude-tags cno.** --apply
// Merge only tags rep.vt.* and rep.whoxy.* from ou:org nodes with edits
// in the top layer.
diff | +ou:org | merge --include-tags rep.vt.* rep.whoxy.* --apply
// Lift only inet:ipv4 nodes with a changed :asn property in top layer
// and merge all changes.
diff --prop inet:ipv4:asn | merge --apply
// Lift only nodes with an added #cno.mal.redtree tag in the top layer and merge them.
diff --tag cno.mal.redtree | merge --apply
'''
name = 'merge'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--apply', default=False, action='store_true',
help='Execute the merge changes.')
pars.add_argument('--wipe', default=False, action='store_true',
help='Replace the top layer in the view with a fresh layer.')
pars.add_argument('--no-tags', default=False, action='store_true',
help='Do not merge tags/tagprops or syn:tag nodes.')
pars.add_argument('--only-tags', default=False, action='store_true',
help='Only merge tags/tagprops or syn:tag nodes.')
pars.add_argument('--include-tags', default=[], nargs='*',
help='Include specific tags/tagprops or syn:tag nodes when '
'merging, others are ignored. Tag glob expressions may '
'be used to specify the tags.')
pars.add_argument('--exclude-tags', default=[], nargs='*',
help='Exclude specific tags/tagprops or syn:tag nodes from merge.'
'Tag glob expressions may be used to specify the tags.')
pars.add_argument('--include-props', default=[], nargs='*',
help='Include specific props when merging, others are ignored.')
pars.add_argument('--exclude-props', default=[], nargs='*',
help='Exclude specific props from merge.')
pars.add_argument('--diff', default=False, action='store_true',
help='Enumerate all changes in the current layer.')
return pars
def _getTagFilter(self):
if self.opts.include_tags:
globs = s_cache.TagGlobs()
for name in self.opts.include_tags:
globs.add(name, True)
def tagfilter(tag):
if globs.get(tag):
return False
return True
return tagfilter
if self.opts.exclude_tags:
globs = s_cache.TagGlobs()
for name in self.opts.exclude_tags:
globs.add(name, True)
def tagfilter(tag):
if globs.get(tag):
return True
return False
return tagfilter
return None
def _getPropFilter(self):
if self.opts.include_props:
_include_props = set(self.opts.include_props)
def propfilter(prop):
return prop not in _include_props
return propfilter
if self.opts.exclude_props:
_exclude_props = set(self.opts.exclude_props)
def propfilter(prop):
return prop in _exclude_props
return propfilter
return None
async def _checkNodePerms(self, node, sode, runt, allows):
layr0 = runt.snap.view.layers[0].iden
layr1 = runt.snap.view.layers[1].iden
if not allows['forms'] and sode.get('valu') is not None:
if not self.opts.wipe:
runt.confirm(('node', 'del', node.form.name), gateiden=layr0)
runt.confirm(('node', 'add', node.form.name), gateiden=layr1)
if not allows['props']:
for name in sode.get('props', {}).keys():
prop = node.form.prop(name)
if not self.opts.wipe:
runt.confirmPropDel(prop, layriden=layr0)
runt.confirmPropSet(prop, layriden=layr1)
if not allows['tags']:
tags = []
tagadds = []
for tag, valu in sode.get('tags', {}).items():
if valu != (None, None):
tagadds.append(tag)
tagperm = tuple(tag.split('.'))
if not self.opts.wipe:
runt.confirm(('node', 'tag', 'del') + tagperm, gateiden=layr0)
runt.confirm(('node', 'tag', 'add') + tagperm, gateiden=layr1)
else:
tags.append((len(tag), tag))
for _, tag in sorted(tags, reverse=True):
look = tag + '.'
if any([tagadd.startswith(look) for tagadd in tagadds]):
continue
tagadds.append(tag)
tagperm = tuple(tag.split('.'))
if not self.opts.wipe:
runt.confirm(('node', 'tag', 'del') + tagperm, gateiden=layr0)
runt.confirm(('node', 'tag', 'add') + tagperm, gateiden=layr1)
for tag in sode.get('tagprops', {}).keys():
tagperm = tuple(tag.split('.'))
if not self.opts.wipe:
runt.confirm(('node', 'tag', 'del') + tagperm, gateiden=layr0)
runt.confirm(('node', 'tag', 'add') + tagperm, gateiden=layr1)
if not allows['ndata']:
async for name in runt.snap.view.layers[0].iterNodeDataKeys(node.buid):
if not self.opts.wipe:
runt.confirm(('node', 'data', 'pop', name), gateiden=layr0)
runt.confirm(('node', 'data', 'set', name), gateiden=layr1)
if not allows['edges']:
async for verb in runt.snap.view.layers[0].iterNodeEdgeVerbsN1(node.buid):
if not self.opts.wipe:
runt.confirm(('node', 'edge', 'del', verb), gateiden=layr0)
runt.confirm(('node', 'edge', 'add', verb), gateiden=layr1)
[docs]
async def execStormCmd(self, runt, genr):
if runt.snap.view.parent is None:
mesg = 'You may only merge nodes in forked views'
raise s_exc.CantMergeView(mesg=mesg)
if self.opts.wipe:
mesg = 'merge --wipe requires view admin'
runt.reqAdmin(gateiden=runt.snap.view.iden, mesg=mesg)
runt.confirm(('layer', 'del'), gateiden=runt.snap.view.layers[0].iden)
notags = self.opts.no_tags
onlytags = self.opts.only_tags
doapply = self.opts.apply
tagfilter = self._getTagFilter()
propfilter = self._getPropFilter()
layr0 = runt.snap.view.layers[0]
layr1 = runt.snap.view.layers[1]
doperms = doapply and not (runt.isAdmin(gateiden=layr0.iden) and runt.isAdmin(gateiden=layr1.iden))
if doperms:
if not self.opts.wipe:
allows = {
'forms': runt.user.allowed(('node', 'del'), gateiden=layr0.iden, deepdeny=True) and
runt.user.allowed(('node', 'add'), gateiden=layr1.iden, deepdeny=True),
'props': runt.user.allowed(('node', 'prop', 'del'), gateiden=layr0.iden, deepdeny=True) and
runt.user.allowed(('node', 'prop', 'set'), gateiden=layr1.iden, deepdeny=True),
'tags': runt.user.allowed(('node', 'tag', 'del'), gateiden=layr0.iden, deepdeny=True) and
runt.user.allowed(('node', 'tag', 'add'), gateiden=layr1.iden, deepdeny=True),
'ndata': runt.user.allowed(('node', 'data', 'pop'), gateiden=layr0.iden, deepdeny=True) and
runt.user.allowed(('node', 'data', 'set'), gateiden=layr1.iden, deepdeny=True),
'edges': runt.user.allowed(('node', 'edge', 'del'), gateiden=layr0.iden, deepdeny=True) and
runt.user.allowed(('node', 'edge', 'add'), gateiden=layr1.iden, deepdeny=True),
}
else:
allows = {
'forms': runt.user.allowed(('node', 'add'), gateiden=layr1.iden, deepdeny=True),
'props': runt.user.allowed(('node', 'prop', 'set'), gateiden=layr1.iden, deepdeny=True),
'tags': runt.user.allowed(('node', 'tag', 'add'), gateiden=layr1.iden, deepdeny=True),
'ndata': runt.user.allowed(('node', 'data', 'set'), gateiden=layr1.iden, deepdeny=True),
'edges': runt.user.allowed(('node', 'edge', 'add'), gateiden=layr1.iden, deepdeny=True),
}
doperms = not all(allows.values())
if self.opts.diff:
async for node, path in genr:
yield node, path
async def diffgenr():
async for buid, sode in layr0.getStorNodes():
node = await runt.snap.getNodeByBuid(buid)
if node is not None:
yield node, runt.initPath(node)
genr = diffgenr()
async with await runt.snap.view.parent.snap(user=runt.user.iden) as snap:
snap.strict = False
snap.on('warn', runt.snap.dist)
async for node, path in genr:
# the timestamp for the adds/subs of each node merge will match
nodeiden = node.iden()
meta = {'user': runt.user.iden, 'time': s_common.now()}
sodes = await node.getStorNodes()
sode = sodes[0]
if doapply:
editor = s_snap.SnapEditor(snap)
subs = []
# check all node perms first
if doperms:
await self._checkNodePerms(node, sode, runt, allows)
form = node.form.name
if form == 'syn:tag':
if notags:
await asyncio.sleep(0)
continue
else:
# avoid merging a tag if the node won't exist below us
if onlytags:
for undr in sodes[1:]:
if undr.get('valu') is not None:
break
else:
await asyncio.sleep(0)
continue
protonode = None
delnode = False
if not onlytags or form == 'syn:tag':
valu = sode.get('valu')
if valu is not None:
if tagfilter is not None and form == 'syn:tag' and tagfilter(valu[0]):
await asyncio.sleep(0)
continue
if not doapply:
valurepr = node.form.type.repr(valu[0])
await runt.printf(f'{nodeiden} {form} = {valurepr}')
else:
delnode = True
if (protonode := await editor.addNode(form, valu[0])) is None:
await asyncio.sleep(0)
continue
elif doapply:
if (protonode := await editor.addNode(form, node.ndef[1], norminfo={})) is None:
await asyncio.sleep(0)
continue
for name, (valu, stortype) in sode.get('props', {}).items():
prop = node.form.prop(name)
if propfilter is not None:
if name[0] == '.':
if propfilter(name):
continue
else:
if propfilter(prop.full):
continue
if prop.info.get('ro'):
if name == '.created':
if doapply:
protonode.props['.created'] = valu
if not self.opts.wipe:
subs.append((s_layer.EDIT_PROP_DEL, (name, valu, stortype), ()))
continue
isset = False
for undr in sodes[1:]:
props = undr.get('props')
if props is not None:
curv = props.get(name)
if curv is not None:
isset = curv[0] != valu
break
if isset:
valurepr = prop.type.repr(curv[0])
mesg = f'Cannot merge read only property with conflicting ' \
f'value: {nodeiden} {form}:{name} = {valurepr}'
await runt.snap.warn(mesg)
continue
if not doapply:
valurepr = prop.type.repr(valu)
await runt.printf(f'{nodeiden} {form}:{name} = {valurepr}')
else:
await protonode.set(name, valu)
if not self.opts.wipe:
subs.append((s_layer.EDIT_PROP_DEL, (name, valu, stortype), ()))
if doapply and protonode is None:
if (protonode := await editor.addNode(form, node.ndef[1], norminfo={})) is None:
await asyncio.sleep(0)
continue
if not notags:
for tag, valu in sode.get('tags', {}).items():
if tagfilter is not None and tagfilter(tag):
continue
if not doapply:
valurepr = ''
if valu != (None, None):
tagrepr = runt.model.type('ival').repr(valu)
valurepr = f' = {tagrepr}'
await runt.printf(f'{nodeiden} {form}#{tag}{valurepr}')
else:
await protonode.addTag(tag, valu)
if not self.opts.wipe:
subs.append((s_layer.EDIT_TAG_DEL, (tag, valu), ()))
for tag, tagdict in sode.get('tagprops', {}).items():
if tagfilter is not None and tagfilter(tag):
continue
for prop, (valu, stortype) in tagdict.items():
if not doapply:
valurepr = repr(valu)
await runt.printf(f'{nodeiden} {form}#{tag}:{prop} = {valurepr}')
else:
await protonode.setTagProp(tag, prop, valu)
if not self.opts.wipe:
subs.append((s_layer.EDIT_TAGPROP_DEL, (tag, prop, valu, stortype), ()))
if not onlytags or form == 'syn:tag':
async for name, valu in s_coro.pause(layr0.iterNodeData(node.buid)):
if not doapply:
valurepr = repr(valu)
await runt.printf(f'{nodeiden} {form} DATA {name} = {valurepr}')
else:
await protonode.setData(name, valu)
if not self.opts.wipe:
subs.append((s_layer.EDIT_NODEDATA_DEL, (name, valu), ()))
async for edge in s_coro.pause(layr0.iterNodeEdgesN1(node.buid)):
name, dest = edge
if not doapply:
await runt.printf(f'{nodeiden} {form} +({name})> {dest}')
else:
await protonode.addEdge(name, dest)
if not self.opts.wipe:
subs.append((s_layer.EDIT_EDGE_DEL, edge, ()))
if delnode and not self.opts.wipe:
subs.append((s_layer.EDIT_NODE_DEL, valu, ()))
if doapply:
addedits = editor.getNodeEdits()
if addedits:
await runt.snap.view.parent.storNodeEdits(addedits, meta=meta)
if subs:
subedits = [(node.buid, node.form.name, subs)]
await runt.snap.view.storNodeEdits(subedits, meta=meta)
runt.snap.clearCachedNode(node.buid)
yield await runt.snap.getNodeByBuid(node.buid), path
if doapply and self.opts.wipe:
await runt.snap.view.swapLayer()
[docs]
class MoveNodesCmd(Cmd):
'''
Move storage nodes between layers.
Storage nodes will be removed from the source layers and the resulting
storage node in the destination layer will contain the merged values (merged
in bottom up layer order by default).
Examples:
// Move storage nodes for ou:org nodes to the top layer
ou:org | movenodes --apply
// Print out what the movenodes command *would* do but dont.
ou:org | movenodes
// In a view with many layers, only move storage nodes from the bottom layer
// to the top layer.
$layers = $lib.view.get().layers
$top = $layers.0.iden
$bot = $layers."-1".iden
ou:org | movenodes --srclayers $bot --destlayer $top
// In a view with many layers, move storage nodes to the top layer and
// prioritize values from the bottom layer over the other layers.
$layers = $lib.view.get().layers
$top = $layers.0.iden
$mid = $layers.1.iden
$bot = $layers.2.iden
ou:org | movenodes --precedence $bot $top $mid
'''
name = 'movenodes'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--apply', default=False, action='store_true',
help='Execute the move changes.')
pars.add_argument('--srclayers', default=None, nargs='*',
help='Specify layers to move storage nodes from (defaults to all below the top layer)')
pars.add_argument('--destlayer', default=None,
help='Layer to move storage nodes to (defaults to the top layer)')
pars.add_argument('--precedence', default=None, nargs='*',
help='Layer precedence for resolving conflicts (defaults to bottom up)')
return pars
async def _checkNodePerms(self, node, sodes, layrdata):
for layr, sode in sodes.items():
if layr == self.destlayr:
continue
if sode.get('valu') is not None:
self.runt.confirm(('node', 'del', node.form.name), gateiden=layr)
self.runt.confirm(('node', 'add', node.form.name), gateiden=self.destlayr)
for name, (valu, stortype) in sode.get('props', {}).items():
full = node.form.prop(name).full
self.runt.confirm(('node', 'prop', 'del', full), gateiden=layr)
self.runt.confirm(('node', 'prop', 'set', full), gateiden=self.destlayr)
for tag, valu in sode.get('tags', {}).items():
tagperm = tuple(tag.split('.'))
self.runt.confirm(('node', 'tag', 'del') + tagperm, gateiden=layr)
self.runt.confirm(('node', 'tag', 'add') + tagperm, gateiden=self.destlayr)
for tag, tagdict in sode.get('tagprops', {}).items():
for prop, (valu, stortype) in tagdict.items():
tagperm = tuple(tag.split('.'))
self.runt.confirm(('node', 'tag', 'del') + tagperm, gateiden=layr)
self.runt.confirm(('node', 'tag', 'add') + tagperm, gateiden=self.destlayr)
for name in layrdata[layr]:
self.runt.confirm(('node', 'data', 'pop', name), gateiden=layr)
self.runt.confirm(('node', 'data', 'set', name), gateiden=self.destlayr)
async for edge in self.lyrs[layr].iterNodeEdgesN1(node.buid):
verb = edge[0]
self.runt.confirm(('node', 'edge', 'del', verb), gateiden=layr)
self.runt.confirm(('node', 'edge', 'add', verb), gateiden=self.destlayr)
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'movenodes arguments must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
if len(runt.snap.view.layers) < 2:
mesg = 'You may only move nodes in views with multiple layers.'
raise s_exc.StormRuntimeError(mesg=mesg)
layridens = {layr.iden: layr for layr in runt.snap.view.layers}
if self.opts.srclayers:
srclayrs = self.opts.srclayers
for layr in srclayrs:
if layr not in layridens:
mesg = f'No layer with iden {layr} in this view, cannot move nodes.'
raise s_exc.BadOperArg(mesg=mesg, layr=layr)
else:
srclayrs = [layr.iden for layr in runt.snap.view.layers[1:]]
if self.opts.destlayer:
self.destlayr = self.opts.destlayer
if self.destlayr not in layridens:
mesg = f'No layer with iden {self.destlayr} in this view, cannot move nodes.'
raise s_exc.BadOperArg(mesg=mesg, layr=self.destlayr)
else:
self.destlayr = runt.snap.view.layers[0].iden
if self.destlayr in srclayrs:
mesg = f'Source layer {self.destlayr} cannot also be the destination layer.'
raise s_exc.StormRuntimeError(mesg=mesg)
self.adds = []
self.subs = {}
self.lyrs = {}
self.runt = runt
if self.opts.precedence:
layrlist = srclayrs + [self.destlayr]
for layr in self.opts.precedence:
if layr not in layridens:
mesg = f'No layer with iden {layr} in this view, cannot be used to specify precedence.'
raise s_exc.BadOperArg(mesg=mesg, layr=layr)
layrlist.remove(layr)
if len(layrlist) > 0:
mesg = 'All source layers and the destination layer must be included when ' \
f'specifying precedence (missing {layrlist}).'
raise s_exc.BadOperArg(mesg=mesg, layrlist=layrlist)
layerord = self.opts.precedence
else:
layerord = layridens.keys()
for layr in layerord:
if layr == self.destlayr or layr in srclayrs:
self.lyrs[layr] = layridens[layr]
if layr in srclayrs:
self.subs[layr] = []
async for node, path in genr:
# the timestamp for the adds/subs of each node merge will match
nodeiden = node.iden()
meta = {'user': runt.user.iden, 'time': s_common.now()}
# get nodedata keys per layer
sodes = {}
layrdata = {}
for layr in self.lyrs.keys():
sodes[layr] = await self.lyrs[layr].getStorNode(node.buid)
layrkeys = set()
async for name in self.lyrs[layr].iterNodeDataKeys(node.buid):
layrkeys.add(name)
layrdata[layr] = layrkeys
# check all perms
if self.opts.apply:
await self._checkNodePerms(node, sodes, layrdata)
delnodes = []
for layr, sode in sodes.items():
if layr == self.destlayr:
continue
valu = sode.get('valu')
if valu is not None:
valurepr = node.form.type.repr(valu[0])
if not self.opts.apply:
await runt.printf(f'{self.destlayr} add {nodeiden} {node.form.name} = {valurepr}')
await runt.printf(f'{layr} delete {nodeiden} {node.form.name} = {valurepr}')
else:
self.adds.append((s_layer.EDIT_NODE_ADD, valu, ()))
delnodes.append((layr, valu))
await self._moveProps(node, sodes, meta)
await self._moveTags(node, sodes, meta)
await self._moveTagProps(node, sodes, meta)
await self._moveNodeData(node, layrdata, meta)
await self._moveEdges(node, meta)
for layr, valu in delnodes:
edit = [(node.buid, node.form.name, [(s_layer.EDIT_NODE_DEL, valu, ())])]
await self.lyrs[layr].storNodeEdits(edit, meta=meta)
runt.snap.livenodes.pop(node.buid, None)
yield await runt.snap.getNodeByBuid(node.buid), path
async def _sync(self, node, meta):
if not self.opts.apply:
return
if self.adds:
addedits = [(node.buid, node.form.name, self.adds)]
await self.lyrs[self.destlayr].storNodeEdits(addedits, meta=meta)
self.adds.clear()
for srclayr, edits in self.subs.items():
if edits:
subedits = [(node.buid, node.form.name, edits)]
await self.lyrs[srclayr].storNodeEdits(subedits, meta=meta)
edits.clear()
async def _moveProps(self, node, sodes, meta):
ecnt = 0
movekeys = set()
form = node.form.name
nodeiden = node.iden()
for layr, sode in sodes.items():
for name, (valu, stortype) in sode.get('props', {}).items():
if (stortype in (s_layer.STOR_TYPE_IVAL, s_layer.STOR_TYPE_MINTIME, s_layer.STOR_TYPE_MAXTIME)
or name not in movekeys) and not layr == self.destlayr:
if not self.opts.apply:
valurepr = node.form.prop(name).type.repr(valu)
await self.runt.printf(f'{self.destlayr} set {nodeiden} {form}:{name} = {valurepr}')
else:
self.adds.append((s_layer.EDIT_PROP_SET, (name, valu, None, stortype), ()))
ecnt += 1
movekeys.add(name)
if not layr == self.destlayr:
if not self.opts.apply:
valurepr = node.form.prop(name).type.repr(valu)
await self.runt.printf(f'{layr} delete {nodeiden} {form}:{name} = {valurepr}')
else:
self.subs[layr].append((s_layer.EDIT_PROP_DEL, (name, None, stortype), ()))
ecnt += 1
if ecnt >= 1000:
await self._sync(node, meta)
ecnt = 0
await self._sync(node, meta)
async def _moveTags(self, node, sodes, meta):
ecnt = 0
form = node.form.name
nodeiden = node.iden()
for layr, sode in sodes.items():
for tag, valu in sode.get('tags', {}).items():
if not layr == self.destlayr:
if not self.opts.apply:
valurepr = ''
if valu != (None, None):
tagrepr = self.runt.model.type('ival').repr(valu)
valurepr = f' = {tagrepr}'
await self.runt.printf(f'{self.destlayr} set {nodeiden} {form}#{tag}{valurepr}')
await self.runt.printf(f'{layr} delete {nodeiden} {form}#{tag}{valurepr}')
else:
self.adds.append((s_layer.EDIT_TAG_SET, (tag, valu, None), ()))
self.subs[layr].append((s_layer.EDIT_TAG_DEL, (tag, None), ()))
ecnt += 2
if ecnt >= 1000:
await self._sync(node, meta)
ecnt = 0
await self._sync(node, meta)
async def _moveTagProps(self, node, sodes, meta):
ecnt = 0
movekeys = set()
form = node.form.name
nodeiden = node.iden()
for layr, sode in sodes.items():
for tag, tagdict in sode.get('tagprops', {}).items():
for prop, (valu, stortype) in tagdict.items():
if (stortype in (s_layer.STOR_TYPE_IVAL, s_layer.STOR_TYPE_MINTIME, s_layer.STOR_TYPE_MAXTIME)
or (tag, prop) not in movekeys) and not layr == self.destlayr:
if not self.opts.apply:
valurepr = repr(valu)
mesg = f'{self.destlayr} set {nodeiden} {form}#{tag}:{prop} = {valurepr}'
await self.runt.printf(mesg)
else:
self.adds.append((s_layer.EDIT_TAGPROP_SET, (tag, prop, valu, None, stortype), ()))
ecnt += 1
movekeys.add((tag, prop))
if not layr == self.destlayr:
if not self.opts.apply:
valurepr = repr(valu)
await self.runt.printf(f'{layr} delete {nodeiden} {form}#{tag}:{prop} = {valurepr}')
else:
self.subs[layr].append((s_layer.EDIT_TAGPROP_DEL, (tag, prop, None, stortype), ()))
ecnt += 1
if ecnt >= 1000:
await self._sync(node, meta)
ecnt = 0
await self._sync(node, meta)
async def _moveNodeData(self, node, layrdata, meta):
ecnt = 0
movekeys = set()
form = node.form.name
nodeiden = node.iden()
for layr in self.lyrs.keys():
for name in layrdata[layr]:
if name not in movekeys and not layr == self.destlayr:
if not self.opts.apply:
await self.runt.printf(f'{self.destlayr} set {nodeiden} {form} DATA {name}')
else:
(retn, valu) = await self.lyrs[layr].getNodeData(node.buid, name)
if retn:
self.adds.append((s_layer.EDIT_NODEDATA_SET, (name, valu, None), ()))
ecnt += 1
await asyncio.sleep(0)
movekeys.add(name)
if not layr == self.destlayr:
if not self.opts.apply:
await self.runt.printf(f'{layr} delete {nodeiden} {form} DATA {name}')
else:
self.subs[layr].append((s_layer.EDIT_NODEDATA_DEL, (name, None), ()))
ecnt += 1
if ecnt >= 1000:
await self._sync(node, meta)
ecnt = 0
await self._sync(node, meta)
async def _moveEdges(self, node, meta):
ecnt = 0
form = node.form.name
nodeiden = node.iden()
for iden, layr in self.lyrs.items():
if not iden == self.destlayr:
async for edge in layr.iterNodeEdgesN1(node.buid):
if not self.opts.apply:
name, dest = edge
await self.runt.printf(f'{self.destlayr} add {nodeiden} {form} +({name})> {dest}')
await self.runt.printf(f'{iden} delete {nodeiden} {form} +({name})> {dest}')
else:
self.adds.append((s_layer.EDIT_EDGE_ADD, edge, ()))
self.subs[iden].append((s_layer.EDIT_EDGE_DEL, edge, ()))
ecnt += 2
if ecnt >= 1000:
await self._sync(node, meta)
ecnt = 0
await self._sync(node, meta)
[docs]
class LimitCmd(Cmd):
'''
Limit the number of nodes generated by the query in the given position.
Example:
inet:ipv4 | limit 10
'''
name = 'limit'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('count', type='int', help='The maximum number of nodes to yield.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
count = 0
async for item in genr:
yield item
count += 1
if count >= self.opts.count:
break
[docs]
class UniqCmd(Cmd):
'''
Filter nodes by their uniq iden values.
When this is used a Storm pipeline, only the first instance of a
given node is allowed through the pipeline.
A relative property or variable may also be specified, which will cause
this command to only allow through the first node with a given value for
that property or value rather than checking the node iden.
Examples:
# Filter duplicate nodes after pivoting from inet:ipv4 nodes tagged with #badstuff
#badstuff +inet:ipv4 ->* | uniq
# Unique inet:ipv4 nodes by their :asn property
#badstuff +inet:ipv4 | uniq :asn
'''
name = 'uniq'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('value', nargs='?', help='A relative property or variable to uniq by.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
async with await s_spooled.Set.anit(dirn=self.runt.snap.core.dirn) as uniqset:
if len(self.argv) > 0:
async for node, path in genr:
valu = await s_stormtypes.toprim(self.opts.value)
valu = s_hashitem.hashitem(valu)
if valu in uniqset:
await asyncio.sleep(0)
continue
await uniqset.add(valu)
yield node, path
else:
async for node, path in genr:
if node.buid in uniqset:
# all filters must sleep
await asyncio.sleep(0)
continue
await uniqset.add(node.buid)
yield node, path
[docs]
class MaxCmd(Cmd):
'''
Consume nodes and yield only the one node with the highest value for an expression.
Examples:
// Yield the file:bytes node with the highest :size property
file:bytes#foo.bar | max :size
// Yield the file:bytes node with the highest value for $tick
file:bytes#foo.bar +.seen ($tick, $tock) = .seen | max $tick
// Yield the it:dev:str node with the longest length
it:dev:str | max $lib.len($node.value())
'''
name = 'max'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('valu', help='The property or variable to use for comparison.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
maxvalu = None
maxitem = None
ivaltype = self.runt.snap.core.model.type('ival')
async for item in genr:
valu = await s_stormtypes.toprim(self.opts.valu)
if valu is None:
continue
if isinstance(valu, (list, tuple)):
if valu == (None, None):
continue
ival, info = ivaltype.norm(valu)
valu = ival[1]
valu = s_stormtypes.intify(valu)
if maxvalu is None or valu > maxvalu:
maxvalu = valu
maxitem = item
if maxitem:
yield maxitem
[docs]
class MinCmd(Cmd):
'''
Consume nodes and yield only the one node with the lowest value for an expression.
Examples:
// Yield the file:bytes node with the lowest :size property
file:bytes#foo.bar | min :size
// Yield the file:bytes node with the lowest value for $tick
file:bytes#foo.bar +.seen ($tick, $tock) = .seen | min $tick
// Yield the it:dev:str node with the shortest length
it:dev:str | min $lib.len($node.value())
'''
name = 'min'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('valu', help='The property or variable to use for comparison.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
minvalu = None
minitem = None
ivaltype = self.runt.snap.core.model.type('ival')
async for node, path in genr:
valu = await s_stormtypes.toprim(self.opts.valu)
if valu is None:
continue
if isinstance(valu, (list, tuple)):
if valu == (None, None):
continue
ival, info = ivaltype.norm(valu)
valu = ival[0]
valu = s_stormtypes.intify(valu)
if minvalu is None or valu < minvalu:
minvalu = valu
minitem = (node, path)
if minitem:
yield minitem
[docs]
class DelNodeCmd(Cmd):
'''
Delete nodes produced by the previous query logic.
(no nodes are returned)
Example
inet:fqdn=vertex.link | delnode
'''
name = 'delnode'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
forcehelp = 'Force delete even if it causes broken references (requires admin).'
pars.add_argument('--force', default=False, action='store_true', help=forcehelp)
pars.add_argument('--delbytes', default=False, action='store_true',
help='For file:bytes nodes, remove the bytes associated with the '
'sha256 property from the axon as well if present.')
pars.add_argument('--deledges', default=False, action='store_true',
help='Delete N2 light edges before deleting the node.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
force = await s_stormtypes.tobool(self.opts.force)
delbytes = await s_stormtypes.tobool(self.opts.delbytes)
deledges = await s_stormtypes.tobool(self.opts.deledges)
if force:
if runt.user is not None and not runt.isAdmin():
mesg = '--force requires admin privs.'
raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name)
if delbytes:
runt.confirm(('storm', 'lib', 'axon', 'del'))
await runt.snap.core.getAxon()
axon = runt.snap.core.axon
async for node, path in genr:
# make sure we can delete the tags...
for tag in node.tags.keys():
runt.layerConfirm(('node', 'tag', 'del', *tag.split('.')))
runt.layerConfirm(('node', 'del', node.form.name))
if deledges:
async with await s_spooled.Set.anit(dirn=self.runt.snap.core.dirn) as edges:
seenverbs = set()
async for (verb, n2iden) in node.iterEdgesN2():
if verb not in seenverbs:
runt.layerConfirm(('node', 'edge', 'del', verb))
seenverbs.add(verb)
await edges.add((verb, n2iden))
async with self.runt.snap.getEditor() as editor:
async for (verb, n2iden) in edges:
n2 = await editor.getNodeByBuid(s_common.uhex(n2iden))
if n2 is not None:
if await n2.delEdge(verb, node.iden()) and len(editor.protonodes) >= 1000:
await self.runt.snap.applyNodeEdits(editor.getNodeEdits())
editor.protonodes.clear()
if delbytes and node.form.name == 'file:bytes':
sha256 = node.props.get('sha256')
if sha256:
sha256b = s_common.uhex(sha256)
await axon.del_(sha256b)
await node.delete(force=force)
await asyncio.sleep(0)
# a bit odd, but we need to be detected as a generator
if False:
yield
[docs]
class ReIndexCmd(Cmd):
'''
Use admin privileges to re index/normalize node properties.
NOTE: Currently does nothing but is reserved for future use.
'''
name = 'reindex'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
return pars
[docs]
async def execStormCmd(self, runt, genr):
mesg = 'reindex currently does nothing but is reserved for future use'
await runt.snap.warn(mesg)
# Make this a generator
if False:
yield
[docs]
class MoveTagCmd(Cmd):
'''
Rename an entire tag tree and preserve time intervals.
Example:
movetag foo.bar baz.faz.bar
'''
name = 'movetag'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('oldtag', help='The tag tree to rename.')
pars.add_argument('newtag', help='The new tag tree name.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'movetag arguments must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
snap = runt.snap
opts = {'vars': {'tag': self.opts.oldtag}}
nodes = await snap.nodes('syn:tag=$tag', opts=opts)
if not nodes:
raise s_exc.BadOperArg(mesg='Cannot move a tag which does not exist.',
oldtag=self.opts.oldtag)
oldt = nodes[0]
oldstr = oldt.ndef[1]
oldsize = len(oldstr)
oldparts = oldstr.split('.')
noldparts = len(oldparts)
newname, newinfo = await snap.getTagNorm(await s_stormtypes.tostr(self.opts.newtag))
newparts = newname.split('.')
runt.layerConfirm(('node', 'tag', 'del', *oldparts))
runt.layerConfirm(('node', 'tag', 'add', *newparts))
newt = await snap.addNode('syn:tag', newname, norminfo=newinfo)
newstr = newt.ndef[1]
if oldstr == newstr:
raise s_exc.BadOperArg(mesg='Cannot retag a tag to the same valu.',
newtag=newstr, oldtag=oldstr)
# do some sanity checking on the new tag to make sure we're not creating a loop
tagcycle = [newstr]
isnow = newt.get('isnow')
while isnow:
if isnow in tagcycle:
raise s_exc.BadOperArg(mesg=f'Pre-existing cycle detected when moving {oldstr} to tag {newstr}',
cycle=tagcycle)
tagcycle.append(isnow)
newtag = await snap.addNode('syn:tag', isnow)
isnow = newtag.get('isnow')
await asyncio.sleep(0)
if oldstr in tagcycle:
raise s_exc.BadOperArg(mesg=f'Tag cycle detected when moving tag {oldstr} to tag {newstr}',
cycle=tagcycle)
retag = {oldstr: newstr}
# first we set all the syn:tag:isnow props
oldtag = self.opts.oldtag.strip('#')
async for node in snap.nodesByPropValu('syn:tag', '^=', oldtag):
tagstr = node.ndef[1]
tagparts = tagstr.split('.')
# Are we in the same tree?
if tagparts[:noldparts] != oldparts:
continue
newtag = newstr + tagstr[oldsize:]
newnode = await snap.addNode('syn:tag', newtag)
olddoc = node.get('doc')
if olddoc is not None:
await newnode.set('doc', olddoc)
olddocurl = node.get('doc:url')
if olddocurl is not None:
await newnode.set('doc:url', olddocurl)
oldtitle = node.get('title')
if oldtitle is not None:
await newnode.set('title', oldtitle)
# Copy any tags over to the newnode if any are present.
for k, v in node.tags.items():
await newnode.addTag(k, v)
await asyncio.sleep(0)
retag[tagstr] = newtag
await node.set('isnow', newtag)
# now we re-tag all the nodes...
count = 0
async for node in snap.nodesByTag(oldstr):
count += 1
tags = list(node.tags.items())
tags.sort(reverse=True)
for name, valu in tags:
newt = retag.get(name)
if newt is None:
await asyncio.sleep(0)
continue
# Capture tagprop information before moving tags
tgfo = {tagp: node.getTagProp(name, tagp) for tagp in node.getTagProps(name)}
# Move the tags
await node.delTag(name)
await node.addTag(newt, valu=valu)
# re-apply any captured tagprop data
for tagp, tagp_valu in tgfo.items():
await node.setTagProp(newt, tagp, tagp_valu)
await snap.printf(f'moved tags on {count} nodes.')
async for node, path in genr:
yield node, path
[docs]
class SpinCmd(Cmd):
'''
Iterate through all query results, but do not yield any.
This can be used to operate on many nodes without returning any.
Example:
foo:bar:size=20 [ +#hehe ] | spin
'''
name = 'spin'
readonly = True
[docs]
async def execStormCmd(self, runt, genr):
if False: # make this method an async generator function
yield None
async for node, path in genr:
await asyncio.sleep(0)
[docs]
class CountCmd(Cmd):
'''
Iterate through query results, and print the resulting number of nodes
which were lifted. This does not yield the nodes counted, unless the
--yield switch is provided.
Example:
# Count the number of IPV4 nodes with a given ASN.
inet:ipv4:asn=20 | count
# Count the number of IPV4 nodes with a given ASN and yield them.
inet:ipv4:asn=20 | count --yield
'''
name = 'count'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--yield', default=False, action='store_true',
dest='yieldnodes', help='Yield inbound nodes.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
i = 0
async for item in genr:
if self.opts.yieldnodes:
yield item
i += 1
await runt.printf(f'Counted {i} nodes.')
[docs]
class IdenCmd(Cmd):
'''
Lift nodes by iden.
Example:
iden b25bc9eec7e159dce879f9ec85fb791f83b505ac55b346fcb64c3c51e98d1175 | count
'''
name = 'iden'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('iden', nargs='*', type='str', default=[],
help='Iden to lift nodes by. May be specified multiple times.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'iden argument must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
async for x in genr:
yield x
for iden in self.opts.iden:
try:
buid = s_common.uhex(iden)
except Exception:
await asyncio.sleep(0)
await runt.warn(f'Failed to decode iden: [{iden}]')
continue
if len(buid) != 32:
await asyncio.sleep(0)
await runt.warn(f'iden must be 32 bytes [{iden}]')
continue
node = await runt.snap.getNodeByBuid(buid)
if node is None:
await asyncio.sleep(0)
continue
yield node, runt.initPath(node)
[docs]
class SleepCmd(Cmd):
'''
Introduce a delay between returning each result for the storm query.
NOTE: This is mostly used for testing / debugging.
Example:
#foo.bar | sleep 0.5
'''
name = 'sleep'
readonly = True
[docs]
async def execStormCmd(self, runt, genr):
async for item in genr:
yield item
await self.runt.waitfini(self.opts.delay)
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('delay', type='float', default=1, help='Delay in floating point seconds.')
return pars
[docs]
class GraphCmd(Cmd):
'''
Generate a subgraph from the given input nodes and command line options.
Example:
Using the graph command::
inet:fqdn | graph
--degrees 2
--filter { -#nope }
--pivot { -> meta:seen }
--form-pivot inet:fqdn {<- * | limit 20}
--form-pivot inet:fqdn {-> * | limit 20}
--form-filter inet:fqdn {-inet:fqdn:issuffix=1}
--form-pivot syn:tag {-> *}
--form-pivot * {-> #}
'''
name = 'graph'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--degrees', type='int', default=1, help='How many degrees to graph out.')
pars.add_argument('--pivot', default=[], action='append',
help='Specify a storm pivot for all nodes. (must quote)')
pars.add_argument('--filter', default=[], action='append',
help='Specify a storm filter for all nodes. (must quote)')
pars.add_argument('--no-edges', default=False, action='store_true',
help='Do not include light weight edges in the per-node output.')
pars.add_argument('--form-pivot', default=[], nargs=2, action='append',
help='Specify a <form> <pivot> form specific pivot.')
pars.add_argument('--form-filter', default=[], nargs=2, action='append',
help='Specify a <form> <filter> form specific filter.')
pars.add_argument('--refs', default=False, action='store_true',
help='Do automatic in-model pivoting with node.getNodeRefs().')
pars.add_argument('--yield-filtered', default=False, action='store_true', dest='yieldfiltered',
help='Yield nodes which would be filtered. This still performs pivots to collect edge data,'
'but does not yield pivoted nodes.')
pars.add_argument('--no-filter-input', default=True, action='store_false', dest='filterinput',
help='Do not drop input nodes if they would match a filter.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'graph arguments must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
rules = {
'degrees': self.opts.degrees,
'pivots': [],
'filters': [],
'forms': {},
'refs': self.opts.refs,
'filterinput': self.opts.filterinput,
'yieldfiltered': self.opts.yieldfiltered,
}
if self.opts.no_edges:
rules['edges'] = False
for pivo in self.opts.pivot:
rules['pivots'].append(pivo)
for filt in self.opts.filter:
rules['filters'].append(filt)
for name, pivo in self.opts.form_pivot:
formrule = rules['forms'].get(name)
if formrule is None:
formrule = {'pivots': [], 'filters': []}
rules['forms'][name] = formrule
formrule['pivots'].append(pivo)
for name, filt in self.opts.form_filter:
formrule = rules['forms'].get(name)
if formrule is None:
formrule = {'pivots': [], 'filters': []}
rules['forms'][name] = formrule
formrule['filters'].append(filt)
subg = s_ast.SubGraph(rules)
async for node, path in subg.run(runt, genr):
yield node, path
[docs]
class ViewExecCmd(Cmd):
'''
Execute a storm query in a different view.
NOTE: Variables are passed through but nodes are not. The behavior of this command may be
non-intuitive in relation to the way storm normally operates. For further information on
behavior and limitations when using `view.exec`, reference the `view.exec` section of the
Synapse User Guide: https://v.vtx.lk/view-exec.
Examples:
// Move some tagged nodes to another view
inet:fqdn#foo.bar $fqdn=$node.value() | view.exec 95d5f31f0fb414d2b00069d3b1ee64c6 { [ inet:fqdn=$fqdn ] }
'''
name = 'view.exec'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('view', help='The GUID of the view in which the query will execute.')
pars.add_argument('storm', help='The storm query to execute on the view.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
# nodes may not pass across views, but their path vars may
node = None
async for node, path in genr:
view = await s_stormtypes.tostr(self.opts.view)
text = await s_stormtypes.tostr(self.opts.storm)
opts = {
'vars': path.vars,
'view': view,
}
query = await runt.getStormQuery(text)
async with runt.getSubRuntime(query, opts=opts) as subr:
async for item in subr.execute():
await asyncio.sleep(0)
yield node, path
if node is None and self.runtsafe:
view = await s_stormtypes.tostr(self.opts.view)
text = await s_stormtypes.tostr(self.opts.storm)
query = await runt.getStormQuery(text)
opts = {'view': view}
async with runt.getSubRuntime(query, opts=opts) as subr:
async for item in subr.execute():
await asyncio.sleep(0)
[docs]
class BackgroundCmd(Cmd):
'''
Execute a query pipeline as a background task.
NOTE: Variables are passed through but nodes are not
'''
name = 'background'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('query', help='The query to execute in the background.')
return pars
[docs]
async def execStormTask(self, query, opts):
core = self.runt.snap.core
user = core._userFromOpts(opts)
info = {'query': query.text,
'view': opts['view'],
'background': True}
await core.boss.promote('storm', user=user, info=info)
async with core.getStormRuntime(query, opts=opts) as runt:
async for item in runt.execute():
await asyncio.sleep(0)
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'The background query must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
async for item in genr:
yield item
_query = await s_stormtypes.tostr(self.opts.query)
query = await runt.getStormQuery(_query)
# make sure the subquery *could* have run
async with runt.getSubRuntime(query) as subr:
query.validate(subr)
runtprims = await s_stormtypes.toprim(self.runt.getScopeVars(), use_list=True)
runtvars = {k: v for (k, v) in runtprims.items() if s_msgpack.isok(v)}
opts = {
'user': runt.user.iden,
'view': runt.snap.view.iden,
'vars': runtvars,
}
coro = self.execStormTask(query, opts)
runt.snap.core.schedCoro(coro)
[docs]
class ParallelCmd(Cmd):
'''
Execute part of a query pipeline in parallel.
This can be useful to minimize round-trip delay during enrichments.
Examples:
inet:ipv4#foo | parallel { $place = $lib.import(foobar).lookup(:latlong) [ :place=$place ] }
NOTE: Storm variables set within the parallel query pipelines do not interact.
NOTE: If there are inbound nodes to the parallel command, parallel pipelines will be created as each node
is processed, up to the number specified by --size. If the number of nodes in the pipeline is less
than the value specified by --size, additional pipelines with no inbound node will not be created.
If there are no inbound nodes to the parallel command, the number of pipelines specified by --size
will always be created.
'''
name = 'parallel'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--size', default=8,
help='The number of parallel Storm pipelines to execute.')
pars.add_argument('query',
help='The query to execute in parallel.')
return pars
[docs]
async def nextitem(self, inq):
while True:
item = await inq.get()
if item is None:
return
yield item
[docs]
async def pipeline(self, runt, query, inq, outq):
try:
async with runt.getSubRuntime(query) as subr:
async for item in subr.execute(genr=self.nextitem(inq)):
await outq.put(item)
await outq.put(None)
except asyncio.CancelledError: # pragma: no cover
raise
except Exception as e:
await outq.put(e)
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'parallel arguments must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
size = await s_stormtypes.toint(self.opts.size)
_query = await s_stormtypes.tostr(self.opts.query)
query = await runt.getStormQuery(_query)
async with runt.getSubRuntime(query) as subr:
query.validate(subr)
async with await s_base.Base.anit() as base:
inq = asyncio.Queue(maxsize=size)
outq = asyncio.Queue(maxsize=size)
tsks = 0
try:
while tsks < size:
await inq.put(await genr.__anext__())
base.schedCoro(self.pipeline(runt, query, inq, outq))
tsks += 1
except StopAsyncIteration:
[await inq.put(None) for i in range(tsks)]
# If a full set of tasks were created, keep pumping nodes into the queue
if tsks == size:
async def pump():
try:
async for pumpitem in genr:
await inq.put(pumpitem)
[await inq.put(None) for i in range(size)]
except Exception as e:
await outq.put(e)
base.schedCoro(pump())
# If no tasks were created, make a full set
elif tsks == 0:
tsks = size
for i in range(size):
base.schedCoro(self.pipeline(runt, query, inq, outq))
[await inq.put(None) for i in range(tsks)]
exited = 0
while True:
item = await outq.get()
if isinstance(item, Exception):
raise item
if item is None:
exited += 1
if exited == tsks:
return
continue
yield item
[docs]
class TeeCmd(Cmd):
'''
Execute multiple Storm queries on each node in the input stream, joining output streams together.
Commands are executed in order they are given; unless the ``--parallel`` switch is provided.
Examples:
# Perform a pivot out and pivot in on a inet:ivp4 node
inet:ipv4=1.2.3.4 | tee { -> * } { <- * }
# Also emit the inbound node
inet:ipv4=1.2.3.4 | tee --join { -> * } { <- * }
# Execute multiple enrichment queries in parallel.
inet:ipv4=1.2.3.4 | tee -p { enrich.foo } { enrich.bar } { enrich.baz }
'''
name = 'tee'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--join', '-j', default=False, action='store_true',
help='Emit inbound nodes after processing storm queries.')
pars.add_argument('--parallel', '-p', default=False, action='store_true',
help='Run the storm queries in parallel instead of sequence. The node output order is not guaranteed.')
pars.add_argument('query', nargs='*',
help='Specify a query to execute on the input nodes.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'tee arguments must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
if not self.opts.query:
raise s_exc.StormRuntimeError(mesg='Tee command must take at least one query as input.',
name=self.name)
async with contextlib.AsyncExitStack() as stack:
runts = []
query_arguments = await s_stormtypes.toprim(self.opts.query)
queries = []
for arg in query_arguments:
if isinstance(arg, str):
queries.append(arg)
continue
# if a argument is a container/iterable, we'll add
# whatever content is in it as query text
for text in arg:
queries.append(text)
for text in queries:
query = await runt.getStormQuery(text)
subr = await stack.enter_async_context(runt.getSubRuntime(query))
runts.append(subr)
size = len(runts)
outq_size = size * 2
node = None
async for node, path in genr:
if self.opts.parallel and runts:
outq = asyncio.Queue(maxsize=outq_size)
for subr in runts:
subg = s_common.agen((node, path.fork(node, None)))
self.runt.snap.schedCoro(self.pipeline(subr, outq, genr=subg))
exited = 0
while True:
item = await outq.get()
if isinstance(item, Exception):
raise item
if item is None:
exited += 1
if exited == size:
break
continue # pragma: no cover
yield item
else:
for subr in runts:
subg = s_common.agen((node, path.fork(node, None)))
async for subitem in subr.execute(genr=subg):
yield subitem
if self.opts.join:
yield node, path
if node is None and self.runtsafe:
if self.opts.parallel and runts:
outq = asyncio.Queue(maxsize=outq_size)
for subr in runts:
self.runt.snap.schedCoro(self.pipeline(subr, outq))
exited = 0
while True:
item = await outq.get()
if isinstance(item, Exception):
raise item
if item is None:
exited += 1
if exited == size:
break
continue # pragma: no cover
yield item
else:
for subr in runts:
async for subitem in subr.execute():
yield subitem
[docs]
async def pipeline(self, runt, outq, genr=None):
try:
async for subitem in runt.execute(genr=genr):
await outq.put(subitem)
await outq.put(None)
except Exception as e:
await outq.put(e)
[docs]
class TreeCmd(Cmd):
'''
Walk elements of a tree using a recursive pivot.
Examples:
# pivot upward yielding each FQDN
inet:fqdn=www.vertex.link | tree { :domain -> inet:fqdn }
'''
name = 'tree'
readonly = True
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('query', help='The pivot query')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'tree query must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
text = await s_stormtypes.tostr(self.opts.query)
async def recurse(node, path):
yield node, path
async for nnode, npath in node.storm(runt, text, path=path):
async for item in recurse(nnode, npath):
yield item
try:
async for node, path in genr:
async for nodepath in recurse(node, path):
yield nodepath
except s_exc.RecursionLimitHit:
raise s_exc.StormRuntimeError(mesg='tree command exceeded maximum depth') from None
[docs]
class ScrapeCmd(Cmd):
'''
Use textual properties of existing nodes to find other easily recognizable nodes.
Examples:
# Scrape properties from inbound nodes and create standalone nodes.
inet:search:query | scrape
# Scrape properties from inbound nodes and make refs light edges to the scraped nodes.
inet:search:query | scrape --refs
# Scrape only the :engine and :text props from the inbound nodes.
inet:search:query | scrape :text :engine
# Scrape the primary property from the inbound nodes.
it:dev:str | scrape $node.repr()
# Scrape properties inbound nodes and yield newly scraped nodes.
inet:search:query | scrape --yield
# Skip re-fanging text before scraping.
inet:search:query | scrape --skiprefang
# Limit scrape to specific forms.
inet:search:query | scrape --forms (inet:fqdn, inet:ipv4)
'''
name = 'scrape'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('--refs', '-r', default=False, action='store_true',
help='Create refs light edges to any scraped nodes from the input node')
pars.add_argument('--yield', dest='doyield', default=False, action='store_true',
help='Include newly scraped nodes in the output')
pars.add_argument('--skiprefang', dest='dorefang', default=True, action='store_false',
help='Do not remove de-fanging from text before scraping')
pars.add_argument('--forms', default=[],
help='Only scrape values which match specific forms.')
pars.add_argument('values', nargs='*',
help='Specific relative properties or variables to scrape')
return pars
[docs]
async def execStormCmd(self, runt, genr):
node = None
async for node, path in genr: # type: s_node.Node, s_node.Path
refs = await s_stormtypes.toprim(self.opts.refs)
forms = await s_stormtypes.toprim(self.opts.forms)
refang = await s_stormtypes.tobool(self.opts.dorefang)
if isinstance(forms, str):
forms = forms.split(',')
elif not isinstance(forms, (tuple, list, set)):
forms = (forms,)
# TODO some kind of repr or as-string option on toprims
todo = await s_stormtypes.toprim(self.opts.values)
# if a list of props haven't been specified, then default to ALL of them
if not todo:
todo = list(node.props.values())
link = {'type': 'scrape'}
for text in todo:
text = str(text)
async for (form, valu, _) in self.runt.snap.view.scrapeIface(text, refang=refang):
if forms and form not in forms:
continue
nnode = await node.snap.addNode(form, valu)
npath = path.fork(nnode, link)
if refs:
if node.form.isrunt:
mesg = f'Edges cannot be used with runt nodes: {node.form.full}'
await runt.warn(mesg)
else:
await node.addEdge('refs', nnode.iden())
if self.opts.doyield:
yield nnode, npath
if not self.opts.doyield:
yield node, path
if self.runtsafe and node is None:
forms = await s_stormtypes.toprim(self.opts.forms)
refang = await s_stormtypes.tobool(self.opts.dorefang)
if isinstance(forms, str):
forms = forms.split(',')
elif not isinstance(forms, (tuple, list, set)):
forms = (forms,)
for item in self.opts.values:
text = str(await s_stormtypes.toprim(item))
async for (form, valu, _) in self.runt.snap.view.scrapeIface(text, refang=refang):
if forms and form not in forms:
continue
addnode = await runt.snap.addNode(form, valu)
if self.opts.doyield:
yield addnode, runt.initPath(addnode)
[docs]
class LiftByVerb(Cmd):
'''
Lift nodes from the current view by an light edge verb.
Examples:
# Lift all the n1 nodes for the light edge "foo"
lift.byverb "foo"
# Lift all the n2 nodes for the light edge "foo"
lift.byverb --n2 "foo"
Notes:
Only a single instance of a node will be yielded from this command
when that node is lifted via the light edge membership.
'''
name = 'lift.byverb'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('verb', type='str', required=True,
help='The edge verb to lift nodes by.')
pars.add_argument('--n2', action='store_true', default=False,
help='Lift by the N2 value instead of N1 value.')
return pars
[docs]
async def iterEdgeNodes(self, verb, idenset, n2=False):
if n2:
async for (_, _, n2) in self.runt.snap.view.getEdges(verb):
if n2 in idenset:
continue
await idenset.add(n2)
node = await self.runt.snap.getNodeByBuid(s_common.uhex(n2))
if node:
yield node
else:
async for (n1, _, _) in self.runt.snap.view.getEdges(verb):
if n1 in idenset:
continue
await idenset.add(n1)
node = await self.runt.snap.getNodeByBuid(s_common.uhex(n1))
if node:
yield node
[docs]
async def execStormCmd(self, runt, genr):
core = self.runt.snap.core
async with await s_spooled.Set.anit(dirn=core.dirn, cell=core) as idenset:
if self.runtsafe:
verb = await s_stormtypes.tostr(self.opts.verb)
n2 = self.opts.n2
async for x in genr:
yield x
async for node in self.iterEdgeNodes(verb, idenset, n2):
yield node, runt.initPath(node)
else:
async for _node, _path in genr:
verb = await s_stormtypes.tostr(self.opts.verb)
n2 = self.opts.n2
yield _node, _path
link = {'type': 'runtime'}
async for node in self.iterEdgeNodes(verb, idenset, n2):
yield node, _path.fork(node, link)
[docs]
class EdgesDelCmd(Cmd):
'''
Bulk delete light edges from input nodes.
Examples:
# Delete all "foo" light edges from an inet:ipv4
inet:ipv4=1.2.3.4 | edges.del foo
# Delete light edges with any verb from a node
inet:ipv4=1.2.3.4 | edges.del *
# Delete all "foo" light edges to an inet:ipv4
inet:ipv4=1.2.3.4 | edges.del foo --n2
'''
name = 'edges.del'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('verb', type='str', help='The verb of light edges to delete.')
pars.add_argument('--n2', action='store_true', default=False,
help='Delete light edges where input node is N2 instead of N1.')
return pars
[docs]
async def delEdges(self, node, verb, n2=False):
if n2:
n2iden = node.iden()
async for (v, n1iden) in node.iterEdgesN2(verb):
n1 = await self.runt.snap.getNodeByBuid(s_common.uhex(n1iden))
if n1 is not None:
await n1.delEdge(v, n2iden)
else:
async for (v, n2iden) in node.iterEdgesN1(verb):
await node.delEdge(v, n2iden)
[docs]
async def execStormCmd(self, runt, genr):
if self.runtsafe:
n2 = self.opts.n2
verb = await s_stormtypes.tostr(self.opts.verb)
if verb == '*':
runt.layerConfirm(('node', 'edge', 'del'))
verb = None
else:
runt.layerConfirm(('node', 'edge', 'del', verb))
async for node, path in genr:
await self.delEdges(node, verb, n2)
yield node, path
else:
async for node, path in genr:
n2 = self.opts.n2
verb = await s_stormtypes.tostr(self.opts.verb)
if verb == '*':
runt.layerConfirm(('node', 'edge', 'del'))
verb = None
else:
runt.layerConfirm(('node', 'edge', 'del', verb))
await self.delEdges(node, verb, n2)
yield node, path
[docs]
class OnceCmd(Cmd):
'''
The once command is used to filter out nodes which have already been processed
via the use of a named key. It includes an optional parameter to allow the node
to pass the filter again after a given amount of time.
For example, to run an enrichment command on a set of nodes just once:
file:bytes#my.files | once enrich:foo | enrich.foo
The once command filters out any nodes which have previously been through any other
use of the "once" command using the same <name> (in this case "enrich:foo").
You may also specify the --asof option to allow nodes to pass the filter after a given
amount of time. For example, the following command will allow any given node through
every 2 days:
file:bytes#my.files | once enrich:foo --asof "-2 days" | enrich.foo
Use of "--asof now" or any future date or positive relative time offset will always
allow the node to pass the filter.
State tracking data for the once command is stored as nodedata which is stored in your
view's write layer, making it view-specific. So if you have two views, A and B, and they
do not share any layers between them, and you execute this query in view A:
inet:ipv4=8.8.8.8 | once enrich:address | enrich.baz
And then you run it in view B, the node will still pass through the once command to the
enrich.baz portion of the query because the tracking data for the once command does not
yet exist in view B.
'''
name = 'once'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('name', type='str', help='Name of the action to only perform once.')
pars.add_argument('--asof', default=None, type='time', help='The associated time the name was updated/performed.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
async for node, path in genr:
tick = s_common.now()
name = await s_stormtypes.tostr(self.opts.name)
key = f'once:{name}'
envl = await node.getData(key)
if envl is not None:
asof = self.opts.asof
last = envl.get('tick')
# edge case to account for old storage format
if last is None:
await node.setData(key, {'tick': tick})
if last is None or asof is None or last > asof:
await asyncio.sleep(0)
continue
await node.setData(key, {'tick': tick})
yield node, path
[docs]
class TagPruneCmd(Cmd):
'''
Prune a tag (or tags) from nodes.
This command will delete the tags specified as parameters from incoming nodes,
as well as all of their parent tags that don't have other tags as children.
For example, given a node with the tags:
#parent
#parent.child
#parent.child.grandchild
Pruning the parent.child.grandchild tag would remove all tags. If the node had
the tags:
#parent
#parent.child
#parent.child.step
#parent.child.grandchild
Pruning the parent.child.grandchild tag will only remove the parent.child.grandchild
tag as the parent tags still have other children.
Examples:
# Prune the parent.child.grandchild tag
inet:ipv4=1.2.3.4 | tag.prune parent.child.grandchild
'''
name = 'tag.prune'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('tags', default=[], nargs='*', help='Names of tags to prune.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if self.runtsafe:
tagargs = [await s_stormtypes.tostr(t) for t in self.opts.tags]
tags = {}
for tag in tagargs:
root = tag.split('.')[0]
runt.layerConfirm(('node', 'tag', 'del', root))
tags[tag] = s_chop.tags(tag)[-2::-1]
async for node, path in genr:
for tag, parents in tags.items():
await node.delTag(tag)
for parent in parents:
if not self.hasChildTags(node, parent):
await node.delTag(parent)
else:
break
yield node, path
else:
permcache = set([])
async for node, path in genr:
tagargs = [await s_stormtypes.tostr(t) for t in self.opts.tags]
tags = {}
for tag in tagargs:
root = tag.split('.')[0]
if root not in permcache:
runt.layerConfirm(('node', 'tag', 'del', root))
permcache.add(root)
tags[tag] = s_chop.tags(tag)[-2::-1]
for tag, parents in tags.items():
await node.delTag(tag)
for parent in parents:
if not self.hasChildTags(node, parent):
await node.delTag(parent)
else:
break
yield node, path
[docs]
class RunAsCmd(Cmd):
'''
Execute a storm query as a specified user.
NOTE: This command requires admin privileges.
NOTE: Heavy objects (for example a View or Layer) are bound to the context which they
are instantiated in and methods on them will be run using the user in that
context. This means that executing a method on a variable containing a heavy
object which was instantiated outside of the runas command and then used
within the runas command will check the permissions of the outer user, not
the one specified by the runas command.
Examples:
// Create a node as another user.
runas someuser { [ inet:fqdn=foo.com ] }
'''
name = 'runas'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('user', help='The user name or iden to execute the storm query as.')
pars.add_argument('storm', help='The storm query to execute.')
pars.add_argument('--asroot', default=False, action='store_true', help='Propagate asroot to query subruntime.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not runt.isAdmin():
mesg = 'The runas command requires admin privileges.'
raise s_exc.AuthDeny(mesg=mesg, user=self.runt.user.iden, username=self.runt.user.name)
core = runt.snap.core
node = None
async for node, path in genr:
user = await s_stormtypes.tostr(self.opts.user)
text = await s_stormtypes.tostr(self.opts.storm)
user = await core.auth.reqUserByNameOrIden(user)
query = await runt.getStormQuery(text)
opts = {'vars': path.vars}
async with await core.snap(user=user, view=runt.snap.view) as snap:
async with await Runtime.anit(query, snap, user=user, opts=opts, root=runt) as subr:
subr.debug = runt.debug
subr.readonly = runt.readonly
if self.opts.asroot:
subr.asroot = runt.asroot
async for item in subr.execute():
await asyncio.sleep(0)
yield node, path
if node is None and self.runtsafe:
user = await s_stormtypes.tostr(self.opts.user)
text = await s_stormtypes.tostr(self.opts.storm)
query = await runt.getStormQuery(text)
user = await core.auth.reqUserByNameOrIden(user)
opts = {'user': user}
async with await core.snap(user=user, view=runt.snap.view) as snap:
async with await Runtime.anit(query, snap, user=user, opts=opts, root=runt) as subr:
subr.debug = runt.debug
subr.readonly = runt.readonly
if self.opts.asroot:
subr.asroot = runt.asroot
async for item in subr.execute():
await asyncio.sleep(0)
[docs]
class IntersectCmd(Cmd):
'''
Yield an intersection of the results of running inbound nodes through a pivot.
NOTE:
This command must consume the entire inbound stream to produce the intersection.
This type of stream consuming before yielding results can cause the query to appear
laggy in comparison with normal incremental stream operations.
Examples:
// Show the it:mitre:attack:technique nodes common to several groups
it:mitre:attack:group*in=(G0006, G0007) | intersect { -> it:mitre:attack:technique }
'''
name = 'intersect'
[docs]
def getArgParser(self):
pars = Cmd.getArgParser(self)
pars.add_argument('query', type='str', required=True, help='The pivot query to run each inbound node through.')
return pars
[docs]
async def execStormCmd(self, runt, genr):
if not self.runtsafe:
mesg = 'intersect arguments must be runtsafe.'
raise s_exc.StormRuntimeError(mesg=mesg)
core = self.runt.snap.core
async with await s_spooled.Dict.anit(dirn=core.dirn, cell=core) as counters:
async with await s_spooled.Dict.anit(dirn=core.dirn, cell=core) as pathvars:
text = await s_stormtypes.tostr(self.opts.query)
query = await runt.getStormQuery(text)
# Note: The intersection works by counting the # of nodes inbound to the command.
# For each node which is emitted from the pivot, we increment a counter, mapping
# the buid -> count. We then iterate over the counter, and only yield nodes which
# have a buid -> count equal to the # of inbound nodes we consumed.
count = 0
async for node, path in genr:
count += 1
await asyncio.sleep(0)
async with runt.getSubRuntime(query) as subr:
subg = s_common.agen((node, path))
async for subn, subp in subr.execute(genr=subg):
curv = counters.get(subn.buid)
if curv is None:
await counters.set(subn.buid, 1)
else:
await counters.set(subn.buid, curv + 1)
await pathvars.set(subn.buid, await s_stormtypes.toprim(subp.vars))
await asyncio.sleep(0)
for buid, hits in counters.items():
if hits != count:
await asyncio.sleep(0)
continue
node = await runt.snap.getNodeByBuid(buid)
if node is not None:
path = runt.initPath(node)
path.vars.update(pathvars.get(buid))
yield (node, path)