Storm Service Development
Anatomy of a Storm Service
A Storm Service (see Service) is a standalone application that extends the capabilities of the Cortex. One common use case for creating a service is to add a Storm command that will query third-party data, translate the results into the Synapse datamodel, and then ingest them into the hypergraph.
In order to leverage core functionalities it is recommended that Storm services are created as Cell implementations, and the documentation that follows will assume this. For additional information see Synapse Architecture.
A Storm service generally implements the following components:
A Package that contains the new Storm Service Commands and optional new Storm Service Modules.
A subclass of
synapse.lib.CellApi
which uses thesynapse.lib.StormSvc
mixin and contains the following information:The service name, version, packages, and events as defined in
synapse.lib.StormSvc
.Custom methods which will be accessible as Telepath API endpoints, and therefore available for use within defined Storm commands.
A subclass of
synapse.lib.Cell
which includes additional configuration definitions and methods required to implement the service.
When implemented as a Cell, methods can also optionally have custom permissions applied to them.
If a specific rule is added it should be namespaced with the service name, e.g. svcname.rule1
.
Alternatively, a method can wrapped with @s_cell.adminapi()
to only allow admin access.
For additional details see Minimal Storm Service Example.
Connecting a service
Before connecting a service to a Cortex it is a best practice to add a new service user,
which can be accomplished with synapse.tools.cellauth
. For example:
python -m synapse.tools.cellauth tcp://root:<root_passwd>@<svc_ip>:<svc_port> modify svcuser1 --adduser
python -m synapse.tools.cellauth tcp://root:<root_passwd>@<svc_ip>:<svc_port> modify svcuser1 --passwd secret
If the service requires specific permissions for a new user they can also be added:
python -m synapse.tools.cellauth tcp://root:<root_passwd>@<svc_ip>:<svc_port> modify svcuser1 --addrule svcname.rule1
Permissions to access the service can be granted by adding the service.get.<svc_iden>
rule to the appropriate users / roles in the Cortex.
A Storm command can be run on the Cortex to add the new service, and the new service will now be present in the service list and Storm help
.
Services are added to a Cortex with the service.add
command.
storm> service.add mysvc tcp://root:[email protected]:41197/
added 38758ca99a6a92b2fafe8d7737cf3dbe (mysvc): tcp://root:[email protected]:41197/
complete. 0 nodes in 14 ms (0/sec).
Services that have been connected to the Cortex can be listed with the
service.list
command.
storm> service.list
Storm service list (iden, ready, name, service name, service version, url):
38758ca99a6a92b2fafe8d7737cf3dbe false (mysvc) (Unknown @ Unknown): tcp://root:[email protected]:41197/
1 services
complete. 0 nodes in 195 ms (0/sec).
Storm Service Commands
Implementation
Multiple Storm commands can be added to a Storm service package, with each defining the following attributes:
name
: Name of the Storm command to expose in the Cortex.
descr
: Description of the command which will be available inhelp
displays.
cmdargs
: An optional list of arguments for the command.
cmdconf
: An optional dictionary of additional configuration variables to provide to the command Storm execution.
forms
: List of input and output forms for the command.
storm
: The Storm code, as a string, that will be executed when the command is called.
Typically, the Storm code will start by getting a reference to the service via $svc = $lib.service.get($cmdconf.svciden)
and reading in any defined cmdargs
that are available in $cmdopts
. The methods defined in the service’s Cell API
can then be called by, for example, $retn = $svc.mysvcmethod($cmdopts.query)
.
Input/Output Conventions
Most commands that enrich or add additional context to nodes should simply yield the nodes they were given as inputs. If they don’t know how to enrich or add additional context to a given form, nodes of that form should be yielded rather than producing an error. This allows a series of enrichment commands to be pipelined regardless of the different inputs that a given command knows how to operate on.
Argument Conventions
--verbose
In general, Storm commands should operate silently over their input nodes and should especially avoid printing anything “per node”.
However, when an error occurs, the command may use $lib.warn()
to print a warning message per-node.
Commands should implement a --verbose
command line option to enable printing “per node” informational output.
--debug
For commands where additional messaging would assist in debugging a --debug
command line option should be implemented.
For example, a Storm command that is querying a third-party data source could use $lib.print()
to print the raw query string
and raw response when the --debug
option is specified.
--yield
For commands that create additional nodes, it may be beneficial to add a --yield
option to allow a query to operate on the newly created nodes.
Some guidelines for --yield
options:
The command should not yield the input node(s) when a
--yield
is specifiedThe
--yield
option should not be implemented when pivoting from the input node to reach the newly created node is a “refs out” or 1-to-1 direct pivot. For example, there is no need to have a--yield
option on themaxmind
command even though it may create aninet:asn
node for an inputinet:ipv4
node due to the 1-to-1 pivot-> inet:asn
being possible.The
--yield
option should ideally determine a “primary” node form to yield even when the command may create many forms in order to tag them or update .seen times.
Storm Service Modules
Modules can be added to a Storm service package to expose reusable Storm functions.
Each module defines a name
, which is used for importing elsewhere via $lib.import()
,
and a storm
string. The Storm code in this case contains callable functions with the format:
function myfunc(var1, var2) {
// function Storm code
}
Minimal Storm Service Example
A best practice is to separate the Storm and service code into separate files, and nest within a synmods
directory to avoid Python namespace conflicts:
service-example
├── synmods
│ └── example
│ ├── __init__.py
│ ├── service.py
│ ├── storm.py
│ └── version.py
The Storm package and the service should also maintain consistent versioning.
For convenience, the example below shows the Storm code included in the service.py
file.
service.py
import sys
import asyncio
import synapse.lib.cell as s_cell
import synapse.lib.stormsvc as s_stormsvc
# The Storm definitions below are included here for convenience
# but are typically contained in a separate storm.py file and imported to service.py.
# Other Storm commands could be created to call the additional Telepath endpoints.
svc_name = 'example'
svc_guid = '0ecc1eb65659a0f07141bc1a360abda3' # can be generated with synapse.common.guid()
svc_vers = (0, 0, 1)
svc_minvers = (2, 8, 0)
svc_evts = {
'add': {
'storm': f'[(meta:source={svc_guid} :name="Example data")]'
}
}
svc_mod_ingest_storm = '''
function ingest_ips(data, srcguid) {
$results = $lib.set()
for $ip in $data {
[ inet:ipv4=$ip ]
// Lightweight edge back to meta:source
{ [ <(seen)+ { meta:source=$srcguid } ] }
{ +inet:ipv4 $results.add($node) }
}
| spin |
return($results)
}
'''
# The first line of this description will display in the Storm help
svc_cmd_get_desc = '''
Query the Example service.
Examples:
# Query the service and create an IPv4 node
inet:fqdn=good.com | example.get
# Query the service and yield the created inet:ipv4 node
inet:fqdn=good.com | example.get --yield
'''
svc_cmd_get_forms = {
'input': [
'inet:fqdn',
],
'output': [
'inet:ipv4',
],
}
svc_cmd_get_args = (
('--yield', {'default': False, 'action': 'store_true',
'help': 'Whether to yield the created nodes to the output stream.'}),
('--debug', {'default': False, 'action': 'store_true',
'help': 'Enable debug output.'}),
)
svc_cmd_get_conf = {
'srcguid': svc_guid,
}
svc_cmd_get_storm = '''
init {
$svc = $lib.service.get($cmdconf.svciden)
$ingest = $lib.import(example.ingest)
$srcguid = $cmdconf.srcguid
$debug = $cmdopts.debug
$yield = $cmdopts.yield
}
// $node is a special variable that references the inbound Node object
$form = $node.form()
switch $form {
"inet:fqdn": {
$query=$node.repr()
}
*: {
$query=""
$lib.warn("Example service does not support {form} nodes", form=$form)
}
}
// Yield behavior to drop the inbound node
if $yield { spin }
// Call the service endpoint and ingest the results
if $query {
if $debug { $lib.print("example.get query: {query}", query=$query) }
$retn = $svc.getData($query)
if $retn.status {
$results = $ingest.ingest_ips($retn.data, $srcguid)
if $yield {
for $result in $results { $lib.print($result) yield $result }
}
} else {
$lib.warn("example.get error: {err}", err=$retn.mesg)
}
}
'''
svc_cmds = (
{
'name': f'{svc_name}.get',
'descr': svc_cmd_get_desc,
'cmdargs': svc_cmd_get_args,
'cmdconf': svc_cmd_get_conf,
'forms': svc_cmd_get_forms,
'storm': svc_cmd_get_storm,
},
)
svc_pkgs = (
{
'name': svc_name,
'version': svc_vers,
'synapse_minversion': svc_minvers,
'modules': (
{
'name': f'{svc_name}.ingest',
'storm': svc_mod_ingest_storm,
},
),
'commands': svc_cmds,
},
)
class ExampleApi(s_cell.CellApi, s_stormsvc.StormSvc):
'''
A Telepath API for the Example service.
'''
# These defaults must be overridden from the StormSvc mixin
_storm_svc_name = svc_name
_storm_svc_vers = svc_vers
_storm_svc_evts = svc_evts
_storm_svc_pkgs = svc_pkgs
async def getData(self, query):
return await self.cell.getData(query)
async def getInfo(self):
await self._reqUserAllowed(('example', 'info'))
return await self.cell.getInfo()
@s_cell.adminapi()
async def getAdminInfo(self):
return await self.cell.getAdminInfo()
class Example(s_cell.Cell):
cellapi = ExampleApi
confdefs = {
'api_key': {
'type': 'string',
'description': 'API key for accessing an external service.',
},
'api_url': {
'type': 'string',
'description': 'The URL for an external service.',
'default': 'https://example.com',
},
}
async def __anit__(self, dirn, conf):
await s_cell.Cell.__anit__(self, dirn, conf=conf)
self.apikey = self.conf.get('api_key')
self.apiurl = self.conf.get('api_url')
async def getData(self, query):
# Best practice is to also return a status and optional message in case of an error
retn = {
'status': True,
'data': None,
'mesg': None,
}
# Retrieving and parsing data would go here
if query == 'good.com':
data = ['1.2.3.4', '5.6.7.8']
retn['data'] = data
else:
retn['status'] = False
retn['mesg'] = 'An error occurred during data retrieval.'
return retn
async def getInfo(self):
info = {
'generic': 'info',
}
return info
async def getAdminInfo(self):
info = {
'admin': 'info',
}
return info