synapse package

The synapse intelligence analysis framework.

Subpackages

Submodules

synapse.axon module

class synapse.axon.Axon[source]

Bases: Cell

byterange = False
cellapi

alias of AxonApi

confdefs = {'http:proxy': {'description': 'An aiohttp-socks compatible proxy URL to use in the wget API.', 'type': 'string'}, 'max:bytes': {'description': 'The maximum number of bytes that can be stored in the Axon.', 'hidecmdl': True, 'minimum': 1, 'type': 'integer'}, 'max:count': {'description': 'The maximum number of files that can be stored in the Axon.', 'hidecmdl': True, 'minimum': 1, 'type': 'integer'}, 'tls:ca:dir': {'description': 'An optional directory of CAs which are added to the TLS CA chain for wget and wput APIs.', 'type': 'string'}}
async csvrows(sha256, dialect='excel', errors='ignore', **fmtparams)[source]
async del_(sha256)[source]

Remove the given bytes from the Axon by sha256.

Parameters:

sha256 (bytes) – The sha256, in bytes, to remove from the Axon.

Returns:

True if the file is removed; false if the file is not present.

Return type:

boolean

async dels(sha256s)[source]

Given a list of sha256 hashes, delete the files from the Axon.

Parameters:

sha256s (list) – A list of sha256 hashes in bytes form.

Returns:

A list of booleans, indicating if the file was deleted or not.

Return type:

list

async get(sha256, offs=None, size=None)[source]

Get bytes of a file.

Parameters:
  • sha256 (bytes) – The sha256 hash of the file in bytes.

  • offs (int) – The offset to start reading from.

  • size (int) – The total number of bytes to read.

Examples

Get the bytes from an Axon and process them:

buf = b''
async for bytz in axon.get(sha256):
    buf =+ bytz

await dostuff(buf)
Yields:

bytes – Chunks of the file bytes.

Raises:

synapse.exc.NoSuchFile – If the file does not exist.

async getCellInfo()[source]

Return metadata specific for the Cell.

Notes

By default, this function returns information about the base Cell implementation, which reflects the base information in the Synapse Cell.

It is expected that implementers override the following Class attributes in order to provide meaningful version information:

COMMIT - A Git Commit VERSION - A Version tuple. VERSTRING - A Version string.

Returns:

A Dictionary of metadata.

Return type:

Dict

async has(sha256)[source]

Check if the Axon has a file.

Parameters:

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns:

True if the Axon has the file; false otherwise.

Return type:

boolean

async hashes(offs, wait=False, timeout=None)[source]

Yield hash rows for files that exist in the Axon in added order starting at an offset.

Parameters:
  • offs (int) – The index offset.

  • wait (boolean) – Wait for new results and yield them in realtime.

  • timeout (int) – Max time to wait for new results.

Yields:

(int, (bytes, int)) – An index offset and the file SHA-256 and size.

Note

If the same hash was deleted and then added back, the same hash will be yielded twice.

async hashset(sha256)[source]

Calculate additional hashes for a file in the Axon.

Parameters:

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns:

A dictionary containing hashes of the file.

Return type:

dict

async history(tick, tock=None)[source]

Yield hash rows for files that existing in the Axon after a given point in time.

Parameters:
  • tick (int) – The starting time (in epoch milliseconds).

  • tock (int) – The ending time to stop iterating at (in epoch milliseconds).

Yields:

(int, (bytes, int)) – A tuple containing time of the hash was added and the file SHA-256 and size.

holdHashLock(hashbyts)[source]

A context manager that synchronizes edit access to a blob.

Parameters:

hashbyts (bytes) – The blob to hold the lock for.

async initServiceRuntime()[source]
async initServiceStorage()[source]
async iterMpkFile(sha256)[source]

Yield items from a MsgPack (.mpk) file in the Axon.

Parameters:

sha256 (str) – The sha256 hash of the file as a string.

Yields:

Unpacked items from the bytes.

async jsonlines(sha256, errors='ignore')[source]
async metrics()[source]

Get the runtime metrics of the Axon.

Returns:

A dictionary of runtime data about the Axon.

Return type:

dict

async postfiles(fields, url, params=None, headers=None, method='POST', ssl=True, timeout=None, proxy=None, ssl_opts=None)[source]

Send files from the axon as fields in a multipart/form-data HTTP request.

Parameters:
  • fields (list) – List of dicts containing the fields to add to the request as form-data.

  • url (str) – The URL to retrieve.

  • params (dict) – Additional parameters to add to the URL.

  • headers (dict) – Additional HTTP headers to add in the request.

  • method (str) – The HTTP method to use.

  • ssl (bool) – Perform SSL verification.

  • timeout (int) – The timeout of the request, in seconds.

  • proxy (bool|str|null) – Use a specific proxy or disable proxy use.

  • ssl_opts (dict) – Additional SSL/TLS options.

Notes

The dictionaries in the fields list may contain the following values:

{
    'name': <str> - Name of the field.
    'sha256': <str> - SHA256 hash of the file to submit for this field.
    'value': <str> - Value for the field. Ignored if a sha256 has been specified.
    'filename': <str> - Optional filename for the field.
    'content_type': <str> - Optional content type for the field.
    'content_transfer_encoding': <str> - Optional content-transfer-encoding header for the field.
}

The ssl_opts dictionary may contain the following values:

{
    'verify': <bool> - Perform SSL/TLS verification. Is overridden by the ssl argument.
    'client_cert': <str> - PEM encoded full chain certificate for use in mTLS.
    'client_key': <str> - PEM encoded key for use in mTLS. Alternatively, can be included in client_cert.
}

The dictionary returned by this may contain the following values:

{
    'ok': <boolean> - False if there were exceptions retrieving the URL.
    'err': <tuple> - Tuple of the error type and information if an exception occurred.
    'url': <str> - The URL retrieved (which could have been redirected)
    'code': <int> - The response code.
    'body': <bytes> - The response body.
    'reason': <str> - The reason phrase for the HTTP status code.
    'headers': <dict> - The response headers as a dictionary.
}
Returns:

An information dictionary containing the results of the request.

Return type:

dict

async put(byts)[source]

Store bytes in the Axon.

Parameters:

byts (bytes) – The bytes to store in the Axon.

Notes

This API should not be used for files greater than 128 MiB in size.

Returns:

A tuple with the file size and sha256 hash of the bytes.

Return type:

tuple(int, bytes)

async puts(files)[source]

Store a set of bytes in the Axon.

Parameters:

files (list) – A list of bytes to store in the Axon.

Notes

This API should not be used for storing more than 128 MiB of bytes at once.

Returns:

A list containing tuples of file size and sha256 hash of the saved bytes.

Return type:

list(tuple(int, bytes))

async readlines(sha256, errors='ignore')[source]
async save(sha256, genr, size)[source]

Save a generator of bytes to the Axon.

Parameters:
  • sha256 (bytes) – The sha256 hash of the file in bytes.

  • genr – The bytes generator function.

Returns:

The size of the bytes saved.

Return type:

int

async size(sha256)[source]

Get the size of a file in the Axon.

Parameters:

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns:

The size of the file, in bytes. If not present, None is returned.

Return type:

int

async upload()[source]

Get an Upload object.

Notes

The UpLoad object should be used to manage uploads greater than 128 MiB in size.

Examples

Use an UpLoad object to upload a file to the Axon:

async with await axon.upload() as upfd:
    # Assumes bytesGenerator yields bytes
    async for byts in bytsgenerator():
        await upfd.write(byts)
    await upfd.save()

Use a single UpLoad object to save multiple files:

async with await axon.upload() as upfd:
    for fp in file_paths:
        # Assumes bytesGenerator yields bytes
        async for byts in bytsgenerator(fp):
            await upfd.write(byts)
        await upfd.save()
Returns:

An Upload manager object.

Return type:

UpLoad

async wants(sha256s)[source]

Get a list of sha256 values the axon does not have from a input list.

Parameters:

sha256s (list) – A list of sha256 values as bytes.

Returns:

A list of bytes containing the sha256 hashes the Axon does not have.

Return type:

list

async wget(url, params=None, headers=None, json=None, body=None, method='GET', ssl=True, timeout=None, proxy=None, ssl_opts=None)[source]

Stream a file download directly into the Axon.

Parameters:
  • url (str) – The URL to retrieve.

  • params (dict) – Additional parameters to add to the URL.

  • headers (dict) – Additional HTTP headers to add in the request.

  • json – A JSON body which is included with the request.

  • body – The body to be included in the request.

  • method (str) – The HTTP method to use.

  • ssl (bool) – Perform SSL verification.

  • timeout (int) – The timeout of the request, in seconds.

  • proxy (bool|str|null) – Use a specific proxy or disable proxy use.

  • ssl_opts (dict) – Additional SSL/TLS options.

Notes

The response body will be stored, regardless of the response code. The ok value in the response does not reflect that a status code, such as a 404, was encountered when retrieving the URL.

The ssl_opts dictionary may contain the following values:

{
    'verify': <bool> - Perform SSL/TLS verification. Is overridden by the ssl argument.
    'client_cert': <str> - PEM encoded full chain certificate for use in mTLS.
    'client_key': <str> - PEM encoded key for use in mTLS. Alternatively, can be included in client_cert.
}

The dictionary returned by this may contain the following values:

{
    'ok': <boolean> - False if there were exceptions retrieving the URL.
    'url': <str> - The URL retrieved (which could have been redirected). This is a url-decoded string.
    'code': <int> - The response code.
    'reason': <str> - The reason phrase for the HTTP status code.
    'mesg': <str> - An error message if there was an exception when retrieving the URL.
    'err': <tuple> - An error tuple if there was an exception when retrieving the URL.
    'headers': <dict> - The response headers as a dictionary.
    'size': <int> - The size in bytes of the response body.
    'hashes': {
        'md5': <str> - The MD5 hash of the response body.
        'sha1': <str> - The SHA1 hash of the response body.
        'sha256': <str> - The SHA256 hash of the response body.
        'sha512': <str> - The SHA512 hash of the response body.
    },
    'request': {
        'url': The request URL. This is a url-decoded string.
        'headers': The request headers.
        'method': The request method.
    }
    'history': A sequence of response bodies to track any redirects, not including hashes.
}
Returns:

An information dictionary containing the results of the request.

Return type:

dict

async wput(sha256, url, params=None, headers=None, method='PUT', ssl=True, timeout=None, filename=None, filemime=None, proxy=None, ssl_opts=None)[source]

Stream a blob from the axon as the body of an HTTP request.

class synapse.axon.AxonApi[source]

Bases: CellApi, Share

async csvrows(sha256, dialect='excel', errors='ignore', **fmtparams)[source]

Yield CSV rows from a CSV file.

Parameters:
  • sha256 (bytes) – The sha256 hash of the file.

  • dialect (str) – The CSV dialect to use.

  • errors (str) – Specify how encoding errors should handled.

  • **fmtparams – The CSV dialect format parameters.

Notes

The dialect and fmtparams expose the Python csv.reader() parameters.

Examples

Get the rows from a CSV file and process them:

async for row in axon.csvrows(sha256):
    await dostuff(row)

Get the rows from a tab separated file and process them:

async for row in axon.csvrows(sha256, delimiter='       '):
    await dostuff(row)
Yields:

list – Decoded CSV rows.

async del_(sha256)[source]

Remove the given bytes from the Axon by sha256.

Parameters:

sha256 (bytes) – The sha256, in bytes, to remove from the Axon.

Returns:

True if the file is removed; false if the file is not present.

Return type:

boolean

async dels(sha256s)[source]

Given a list of sha256 hashes, delete the files from the Axon.

Parameters:

sha256s (list) – A list of sha256 hashes in bytes form.

Returns:

A list of booleans, indicating if the file was deleted or not.

Return type:

list

async get(sha256, offs=None, size=None)[source]

Get bytes of a file.

Parameters:
  • sha256 (bytes) – The sha256 hash of the file in bytes.

  • offs (int) – The offset to start reading from.

  • size (int) – The total number of bytes to read.

Examples

Get the bytes from an Axon and process them:

buf = b''
async for bytz in axon.get(sha256):
    buf += bytz

await dostuff(buf)
Yields:

bytes – Chunks of the file bytes.

Raises:

synapse.exc.NoSuchFile – If the file does not exist.

async has(sha256)[source]

Check if the Axon has a file.

Parameters:

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns:

True if the Axon has the file; false otherwise.

Return type:

boolean

async hashes(offs, wait=False, timeout=None)[source]

Yield hash rows for files that exist in the Axon in added order starting at an offset.

Parameters:
  • offs (int) – The index offset.

  • wait (boolean) – Wait for new results and yield them in realtime.

  • timeout (int) – Max time to wait for new results.

Yields:

(int, (bytes, int)) – An index offset and the file SHA-256 and size.

async hashset(sha256)[source]

Calculate additional hashes for a file in the Axon.

Parameters:

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns:

A dictionary containing hashes of the file.

Return type:

dict

async history(tick, tock=None)[source]

Yield hash rows for files that existing in the Axon after a given point in time.

Parameters:
  • tick (int) – The starting time (in epoch milliseconds).

  • tock (int) – The ending time to stop iterating at (in epoch milliseconds).

Yields:

(int, (bytes, int)) – A tuple containing time of the hash was added and the file SHA-256 and size.

async iterMpkFile(sha256)[source]

Yield items from a MsgPack (.mpk) file in the Axon.

Parameters:

sha256 (bytes) – The sha256 hash of the file in bytes.

Yields:

Unpacked items from the bytes.

async jsonlines(sha256, errors='ignore')[source]

Yield JSON objects from JSONL (JSON lines) file.

Parameters:
  • sha256 (bytes) – The sha256 hash of the file.

  • errors (str) – Specify how encoding errors should handled.

Yields:

object – Decoded JSON objects.

async metrics()[source]

Get the runtime metrics of the Axon.

Returns:

A dictionary of runtime data about the Axon.

Return type:

dict

async postfiles(fields, url, params=None, headers=None, method='POST', ssl=True, timeout=None, proxy=None, ssl_opts=None)[source]
async put(byts)[source]

Store bytes in the Axon.

Parameters:

byts (bytes) – The bytes to store in the Axon.

Notes

This API should not be used for files greater than 128 MiB in size.

Returns:

A tuple with the file size and sha256 hash of the bytes.

Return type:

tuple(int, bytes)

async puts(files)[source]

Store a set of bytes in the Axon.

Parameters:

files (list) – A list of bytes to store in the Axon.

Notes

This API should not be used for storing more than 128 MiB of bytes at once.

Returns:

A list containing tuples of file size and sha256 hash of the saved bytes.

Return type:

list(tuple(int, bytes))

async readlines(sha256, errors='ignore')[source]

Yield lines from a multi-line text file in the axon.

Parameters:
  • sha256 (bytes) – The sha256 hash of the file.

  • errors (str) – Specify how encoding errors should handled.

Yields:

str – Lines of text

async size(sha256)[source]

Get the size of a file in the Axon.

Parameters:

sha256 (bytes) – The sha256 hash of the file in bytes.

Returns:

The size of the file, in bytes. If not present, None is returned.

Return type:

int

async upload()[source]

Get an Upload object.

Notes

The UpLoad object should be used to manage uploads greater than 128 MiB in size.

Examples

Use an UpLoad object to upload a file to the Axon:

async with axonProxy.upload() as upfd:
    # Assumes bytesGenerator yields bytes
    async for byts in bytsgenerator():
        upfd.write(byts)
    upfd.save()

Use a single UpLoad object to save multiple files:

async with axonProxy.upload() as upfd:
    for fp in file_paths:
        # Assumes bytesGenerator yields bytes
        async for byts in bytsgenerator(fp):
            upfd.write(byts)
        upfd.save()
Returns:

An Upload manager object.

Return type:

UpLoadShare

async wants(sha256s)[source]

Get a list of sha256 values the axon does not have from an input list.

Parameters:

sha256s (list) – A list of sha256 values as bytes.

Returns:

A list of bytes containing the sha256 hashes the Axon does not have.

Return type:

list

async wget(url, params=None, headers=None, json=None, body=None, method='GET', ssl=True, timeout=None, proxy=None, ssl_opts=None)[source]

Stream a file download directly into the Axon.

Parameters:
  • url (str) – The URL to retrieve.

  • params (dict) – Additional parameters to add to the URL.

  • headers (dict) – Additional HTTP headers to add in the request.

  • json – A JSON body which is included with the request.

  • body – The body to be included in the request.

  • method (str) – The HTTP method to use.

  • ssl (bool) – Perform SSL verification.

  • timeout (int) – The timeout of the request, in seconds.

  • ssl_opts (dict) – Additional SSL/TLS options.

Notes

The response body will be stored, regardless of the response code. The ok value in the response does not reflect that a status code, such as a 404, was encountered when retrieving the URL.

The ssl_opts dictionary may contain the following values:

{
    'verify': <bool> - Perform SSL/TLS verification. Is overridden by the ssl argument.
    'client_cert': <str> - PEM encoded full chain certificate for use in mTLS.
    'client_key': <str> - PEM encoded key for use in mTLS. Alternatively, can be included in client_cert.
}

The dictionary returned by this may contain the following values:

{
    'ok': <boolean> - False if there were exceptions retrieving the URL.
    'url': <str> - The URL retrieved (which could have been redirected). This is a url-decoded string.
    'code': <int> - The response code.
    'reason': <str> - The reason phrase for the HTTP status code.
    'mesg': <str> - An error message if there was an exception when retrieving the URL.
    'err': <tuple> - An error tuple if there was an exception when retrieving the URL.
    'headers': <dict> - The response headers as a dictionary.
    'size': <int> - The size in bytes of the response body.
    'hashes': {
        'md5': <str> - The MD5 hash of the response body.
        'sha1': <str> - The SHA1 hash of the response body.
        'sha256': <str> - The SHA256 hash of the response body.
        'sha512': <str> - The SHA512 hash of the response body.
    },
    'request': {
        'url': The request URL. This is a url-decoded string.
        'headers': The request headers.
        'method': The request method.
    }
    'history': A sequence of response bodies to track any redirects, not including hashes.
}
Returns:

An information dictionary containing the results of the request.

Return type:

dict

async wput(sha256, url, params=None, headers=None, method='PUT', ssl=True, timeout=None, proxy=None, ssl_opts=None)[source]
class synapse.axon.AxonFileHandler(application: Application, request: HTTPServerRequest, **kwargs: Any)[source]

Bases: AxonHandlerMixin, Handler

async getAxonInfo()[source]
class synapse.axon.AxonHandlerMixin[source]

Bases: object

getAxon()[source]

Get a reference to the Axon interface used by the handler.

class synapse.axon.AxonHttpBySha256InvalidV1(application: Application, request: HTTPServerRequest, **kwargs: Any)[source]

Bases: AxonFileHandler

async delete(sha256)[source]
async get