Storm Service Development

Anatomy of a Storm Service

A Storm Service (see Service) is a standalone application that extends the capabilities of the Cortex. One common use case for creating a service is to add a Storm command that will query third-party data, translate the results into the Synapse datamodel, and then ingest them into the hypergraph.

In order to leverage core functionalities it is recommended that Storm services are created as Cell implementations, and the documentation that follows will assume this. For additional information see Synapse Architecture.

A Storm service generally implements the following components:

  • A Package that contains the new Storm Service Commands and optional new Storm Service Modules.

  • A subclass of synapse.lib.CellApi which uses the synapse.lib.StormSvc mixin and contains the following information:

    • The service name, version, packages, and events as defined in synapse.lib.StormSvc.
    • Custom methods which will be accessible as Telepath API endpoints, and therefore available for use within defined Storm commands.
  • A subclass of synapse.lib.Cell which includes additional configuration definitions and methods required to implement the service.

When implemented as a Cell, methods can also optionally have custom permissions applied to them. If a specific rule is added it should be namespaced with the service name, e.g. svcname.rule1. Alternatively, a method can wrapped with @s_cell.adminapi() to only allow admin access.

For additional details see Minimal Storm Service Example.

Connecting a service

For instructions on configuring and starting a Cell service see Configuring a Cell.

Before connecting a service to a Cortex it is a best practice to add a new service user, which can be accomplished with synapse.tools.cellauth. For example:

python -m synapse.tools.cellauth tcp://root:<root_passwd>@<svc_ip>:<svc_port> modify svcuser1 --adduser
python -m synapse.tools.cellauth tcp://root:<root_passwd>@<svc_ip>:<svc_port> modify svcuser1 --passwd secret

If the service requires specific permissions for a new user they can also be added:

python -m synapse.tools.cellauth tcp://root:<root_passwd>@<svc_ip>:<svc_port> modify svcuser1 --addrule svcname.rule1

Permissions to access the service can be granted by adding the service.get.<svc_iden> rule to the appropriate users / roles in the Cortex.

A Storm command can be run on the Cortex to add the new service, and the new service will now be present in the service list and Storm help.

Services are added to a Cortex with the service.add command.

cli> storm service.add mysvc tcp://root:secret@127.0.0.1:46363/mysvc
Executing query at 2021/09/17 13:28:39.557
added edf9254c1b156796b1ddc2d5b639cb77 (mysvc): tcp://root:secret@127.0.0.1:46363/mysvc
complete. 0 nodes in 14 ms (0/sec).

Services that have been connected to the Cortex can be listed with the service.list command.

cli> storm service.list
Executing query at 2021/09/17 13:28:39.599

Storm service list (iden, ready, name, service name, service version, url):
    edf9254c1b156796b1ddc2d5b639cb77 True (mysvc) (mysvc @ 0.0.1): tcp://root:secret@127.0.0.1:46363/mysvc

1 services
complete. 0 nodes in 325 ms (0/sec).

Storm Service Commands

Implementation

Multiple Storm commands can be added to a Storm service package, with each defining the following attributes:

  • name: Name of the Storm command to expose in the Cortex.
  • descr: Description of the command which will be available in help displays.
  • cmdargs: An optional list of arguments for the command.
  • cmdconf: An optional dictionary of additional configuration variables to provide to the command Storm execution.
  • forms: List of input and output forms for the command.
  • storm: The Storm code, as a string, that will be executed when the command is called.

Typically, the Storm code will start by getting a reference to the service via $svc = $lib.service.get($cmdconf.svciden) and reading in any defined cmdargs that are available in $cmdopts. The methods defined in the service’s Cell API can then be called by, for example, $retn = $svc.mysvcmethod($cmdopts.query).

Input/Output Conventions

Most commands that enrich or add additional context to nodes should simply yield the nodes they were given as inputs. If they don’t know how to enrich or add additional context to a given form, nodes of that form should be yielded rather than producing an error. This allows a series of enrichment commands to be pipelined regardless of the different inputs that a given command knows how to operate on.

Argument Conventions

--verbose

In general, Storm commands should operate silently over their input nodes and should especially avoid printing anything “per node”. However, when an error occurs, the command may use $lib.warn() to print a warning message per-node. Commands should implement a --verbose command line option to enable printing “per node” informational output.

--debug

For commands where additional messaging would assist in debugging a --debug command line option should be implemented. For example, a Storm command that is querying a third-party data source could use $lib.print() to print the raw query string and raw response when the --debug option is specified.

--yield

For commands that create additional nodes, it may be beneficial to add a --yield option to allow a query to operate on the newly created nodes. Some guidelines for --yield options:

  • The command should not yield the input node(s) when a --yield is specified
  • The --yield option should not be implemented when pivoting from the input node to reach the newly created node is a “refs out” or 1-to-1 direct pivot. For example, there is no need to have a --yield option on the maxmind command even though it may create an inet:asn node for an input inet:ipv4 node due to the 1-to-1 pivot -> inet:asn being possible.
  • The --yield option should ideally determine a “primary” node form to yield even when the command may create many forms in order to tag them or update .seen times.

Storm Service Modules

Modules can be added to a Storm service package to expose reusable Storm functions. Each module defines a name, which is used for importing elsewhere via $lib.import(), and a storm string. The Storm code in this case contains callable functions with the format:

function myfunc(var1, var2) {
    // function Storm code
}

Minimal Storm Service Example

A best practice is to separate the Storm and service code into separate files, and nest within a synmods directory to avoid Python namespace conflicts:

service-example
├── synmods
│   └── example
│       ├── __init__.py
│       ├── service.py
│       ├── storm.py
│       └── version.py

The Storm package and the service should also maintain consistent versioning.

For convenience, the example below shows the Storm code included in the service.py file.

service.py

import sys
import asyncio

import synapse.lib.cell as s_cell
import synapse.lib.stormsvc as s_stormsvc

# The Storm definitions below are included here for convenience
# but are typically contained in a separate storm.py file and imported to service.py.
# Other Storm commands could be created to call the additional Telepath endpoints.
svc_name = 'example'
svc_guid = '0ecc1eb65659a0f07141bc1a360abda3'  # can be generated with synapse.common.guid()
svc_vers = (0, 0, 1)
svc_minvers = (2, 8, 0)

svc_evts = {
    'add': {
        'storm': f'[(meta:source={svc_guid} :name="Example data")]'
    }
}

svc_mod_ingest_storm = '''
function ingest_ips(data, srcguid) {
    $results = $lib.set()

    for $ip in $data {
        [ inet:ipv4=$ip ]

        // Lightweight edge back to meta:source
        { [ <(seen)+ { meta:source=$srcguid } ] }

        { +inet:ipv4 $results.add($node) }
    }

    | spin |

    return($results)
}
'''

# The first line of this description will display in the Storm help
svc_cmd_get_desc = '''
Query the Example service.

Examples:

    # Query the service and create an IPv4 node
    inet:fqdn=good.com | example.get

    # Query the service and yield the created inet:ipv4 node
    inet:fqdn=good.com | example.get --yield
'''

svc_cmd_get_forms = {
    'input': [
        'inet:fqdn',
    ],
    'output': [
        'inet:ipv4',
    ],
}

svc_cmd_get_args = (
    ('--yield', {'default': False, 'action': 'store_true',
                 'help': 'Whether to yield the created nodes to the output stream.'}),
    ('--debug', {'default': False, 'action': 'store_true',
                 'help': 'Enable debug output.'}),
)

svc_cmd_get_conf = {
    'srcguid': svc_guid,
}

svc_cmd_get_storm = '''
init {
    $svc = $lib.service.get($cmdconf.svciden)
    $ingest = $lib.import(example.ingest)
    $srcguid = $cmdconf.srcguid
    $debug = $cmdopts.debug
    $yield = $cmdopts.yield
}

// $node is a special variable that references the inbound Node object
$form = $node.form()

switch $form {
    "inet:fqdn": {
        $query=$node.repr()
    }
    *: {
        $query=""
        $lib.warn("Example service does not support {form} nodes", form=$form)
    }
}

// Yield behavior to drop the inbound node
if $yield { spin }

// Call the service endpoint and ingest the results
if $query {
    if $debug { $lib.print("example.get query: {query}", query=$query) }

    $retn = $svc.getData($query)

    if $retn.status {
        $results = $ingest.ingest_ips($retn.data, $srcguid)

        if $yield {
            for $result in $results { $lib.print($result) yield $result }
        }
    } else {
        $lib.warn("example.get error: {err}", err=$retn.mesg)
    }
}
'''

svc_cmds = (
    {
        'name': f'{svc_name}.get',
        'descr': svc_cmd_get_desc,
        'cmdargs': svc_cmd_get_args,
        'cmdconf': svc_cmd_get_conf,
        'forms': svc_cmd_get_forms,
        'storm': svc_cmd_get_storm,
    },
)

svc_pkgs = (
    {
        'name': svc_name,
        'version': svc_vers,
        'synapse_minversion': svc_minvers,
        'modules': (
            {
                'name': f'{svc_name}.ingest',
                'storm': svc_mod_ingest_storm,
            },
        ),
        'commands': svc_cmds,
    },
)

class ExampleApi(s_cell.CellApi, s_stormsvc.StormSvc):
    '''
    A Telepath API for the Example service.
    '''

    # These defaults must be overridden from the StormSvc mixin
    _storm_svc_name = svc_name
    _storm_svc_vers = svc_vers
    _storm_svc_evts = svc_evts
    _storm_svc_pkgs = svc_pkgs

    async def getData(self, query):
        return await self.cell.getData(query)

    async def getInfo(self):
        await self._reqUserAllowed(('example', 'info'))
        return await self.cell.getInfo()

    @s_cell.adminapi()
    async def getAdminInfo(self):
        return await self.cell.getAdminInfo()

class Example(s_cell.Cell):

    cellapi = ExampleApi

    confdefs = {
        'api_key': {
            'type': 'string',
            'description': 'API key for accessing an external service.',
        },
        'api_url': {
            'type': 'string',
            'description': 'The URL for an external service.',
            'default': 'https://example.com',
        },
    }

    async def __anit__(self, dirn, conf):
        await s_cell.Cell.__anit__(self, dirn, conf=conf)
        self.apikey = self.conf.get('api_key')
        self.apiurl = self.conf.get('api_url')

    async def getData(self, query):
        # Best practice is to also return a status and optional message in case of an error
        retn = {
            'status': True,
            'data': None,
            'mesg': None,
        }

        # Retrieving and parsing data would go here
        if query == 'good.com':
            data = ['1.2.3.4', '5.6.7.8']
            retn['data'] = data

        else:
            retn['status'] = False
            retn['mesg'] = 'An error occurred during data retrieval.'

        return retn

    async def getInfo(self):
        info = {
            'generic': 'info',
        }

        return info

    async def getAdminInfo(self):
        info = {
            'admin': 'info',
        }

        return info