import os
import re
import sys
import copy
import gzip
import pprint
import asyncio
import argparse
import datetime
import tempfile
import textwrap
import traceback
import subprocess
import collections
import regex
import synapse.exc as s_exc
import synapse.common as s_common
import synapse.cortex as s_cortex
import synapse.lib.output as s_output
import synapse.lib.autodoc as s_autodoc
import synapse.lib.schemas as s_schemas
import synapse.lib.version as s_version
defstruct = (
('type', None),
('desc', ''),
('prs', ()),
)
SKIP_FILES = (
'.gitkeep',
'modelrefs',
)
version_regex = r'^v[0-9]\.[0-9]+\.[0-9]+((a|b|rc)[0-9]*)?$'
def _getCurrentCommit(outp: s_output.OutPut) -> str | None:
try:
ret = subprocess.run(['git', 'rev-parse', 'HEAD'],
capture_output=True,
timeout=15,
check=False,
text=True,
)
except Exception as e:
outp.printf(f'Error grabbing commit: {e}')
return
else:
commit = ret.stdout.strip()
assert commit
return commit
async def _getCurrentModl(outp: s_output.OutPut) -> dict:
with tempfile.TemporaryDirectory() as dirn:
conf = {'health:sysctl:checks': False}
async with await s_cortex.Cortex.anit(conf=conf, dirn=dirn) as core:
modl = await core.getModelDict()
# Reserialize modl so its consistent with the model on disk
modl = s_common.yamlloads(s_common.yamldump(modl))
return modl
[docs]
class ModelDiffer:
def __init__(self, current_model: dict, reference_model: dict):
self.cur_model = current_model
self.ref_model = reference_model
self.changes = {}
self.cur_iface_to_allifaces = collections.defaultdict(list)
for iface, info in self.cur_model.get('interfaces').items():
self.cur_iface_to_allifaces[iface] = [iface]
q = collections.deque(info.get('interfaces', ()))
while q:
_iface = q.popleft()
if _iface in self.cur_iface_to_allifaces[iface]:
continue
self.cur_iface_to_allifaces[iface].append(_iface)
q.extend(self.cur_model.get('interfaces').get(_iface).get('interfaces', ()))
self.cur_type2iface = collections.defaultdict(list)
for _type, tnfo in self.cur_model.get('types').items():
for iface in tnfo.get('info').get('interfaces', ()):
ifaces = self.cur_iface_to_allifaces[iface]
for _iface in ifaces:
if _iface not in self.cur_type2iface[_type]:
self.cur_type2iface[_type].append(_iface)
def _compareEdges(self, curv, oldv, outp: s_output.OutPut) -> dict:
changes = {}
if curv == oldv:
return changes
# Flatten the edges into structures that can be handled
_curv = {tuple(item[0]): item[1] for item in curv}
_oldv = {tuple(item[0]): item[1] for item in oldv}
curedges = set(_curv.keys())
oldedges = set(_oldv.keys())
new_edges = curedges - oldedges
del_edges = oldedges - curedges # This should generally not happen...
assert len(del_edges) == 0, 'A edge was removed from the data model!'
if new_edges:
changes['new_edges'] = {k: _curv.get(k) for k in new_edges}
updated_edges = collections.defaultdict(dict)
deprecated_edges = {}
for edge, curinfo in _curv.items():
if edge in new_edges:
continue
oldinfo = _oldv.get(edge)
if curinfo == oldinfo:
continue
if curinfo.get('deprecated') and not oldinfo.get('deprecated'):
deprecated_edges[edge] = curinfo
continue
# TODO - Support changes to the edges?
assert False, f'A change was found for the edge: {edge}'
if updated_edges:
changes['updated_edges'] = dict(updated_edges)
if deprecated_edges:
changes['deprecated_edges'] = deprecated_edges
return changes
def _compareForms(self, curv, oldv, outp: s_output.OutPut) -> dict:
changes = {}
if curv == oldv:
return changes
curforms = set(curv.keys())
oldforms = set(oldv.keys())
new_forms = curforms - oldforms
del_forms = oldforms - curforms # This should generally not happen...
assert len(del_forms) == 0, 'A form was removed from the data model!'
if new_forms:
changes['new_forms'] = {k: curv.get(k) for k in new_forms}
updated_forms = collections.defaultdict(dict)
for form, curinfo in curv.items():
if form in new_forms:
continue
oldinfo = oldv.get(form)
if curinfo == oldinfo:
continue
# Check for different properties
nprops = curinfo.get('props')
oprops = oldinfo.get('props')
new_props = set(nprops.keys()) - set(oprops.keys())
del_props = set(oprops.keys()) - set(nprops.keys()) # This should generally not happen...
assert len(del_props) == 0, 'A form was removed from the data model!'
if new_props:
updated_forms[form]['new_properties'] = {prop: nprops.get(prop) for prop in new_props}
np_noiface = {}
ifaces = self.cur_type2iface[form]
for prop in new_props:
# TODO record raw new_props to make bulk edits possible
is_ifaceprop = False
for iface in ifaces:
# Is the prop in the new_interfaces or updated_interfaces lists?
new_iface = self.changes.get('interfaces').get('new_interfaces', {}).get(iface)
if new_iface and prop in new_iface.get('props', {}):
is_ifaceprop = True
break
upt_iface = self.changes.get('interfaces').get('updated_interfaces', {}).get(iface)
if upt_iface and prop in upt_iface.get('new_properties', {}):
is_ifaceprop = True
break
if is_ifaceprop:
continue
np_noiface[prop] = nprops.get(prop)
if np_noiface:
updated_forms[form]['new_properties_no_interfaces'] = np_noiface
updated_props = {}
updated_props_noiface = {}
deprecated_props = {}
deprecated_props_noiface = {}
for prop, cpinfo in nprops.items():
if prop in new_props:
continue
opinfo = oprops.get(prop)
if cpinfo == opinfo:
continue
# Deprecation has a higher priority than updated type information
if cpinfo.get('deprecated') and not opinfo.get('deprecated'):
# A deprecated property could be present on an updated iface
deprecated_props[prop] = cpinfo
is_ifaceprop = False
for iface in self.cur_type2iface[form]:
upt_iface = self.changes.get('interfaces').get('updated_interfaces', {}).get(iface)
if upt_iface and prop in upt_iface.get('deprecated_properties', {}):
is_ifaceprop = True
break
if is_ifaceprop:
continue
deprecated_props_noiface[prop] = cpinfo
continue
okeys = set(opinfo.keys())
nkeys = set(cpinfo.keys())
if nkeys - okeys:
# We've added a key to the prop def.
raise s_exc.NoSuchImpl(mesg='Have not implemented support for a prop def having a key added')
if okeys - nkeys:
# We've removed a key from the prop def.
updated_props[prop] = {'type': 'delkey', 'keys': list(okeys - nkeys)}
continue
# Check if type change happened, we'll want to document that.
ctyp = cpinfo.get('type')
otyp = opinfo.get('type')
if ctyp == otyp:
continue
updated_props[prop] = {'type': 'type_change', 'new_type': ctyp, 'old_type': otyp}
is_ifaceprop = False
for iface in self.cur_type2iface[form]:
upt_iface = self.changes.get('interfaces').get('updated_interfaces', {}).get(iface)
if upt_iface and prop in upt_iface.get('updated_properties', {}):
is_ifaceprop = True
break
if is_ifaceprop:
continue
updated_props_noiface[prop] = {'type': 'type_change', 'new_type': ctyp, 'old_type': otyp}
if updated_props:
updated_forms[form]['updated_properties'] = updated_props
if updated_props_noiface:
updated_forms[form]['updated_properties_no_interfaces'] = updated_props_noiface
if deprecated_props:
updated_forms[form]['deprecated_properties'] = deprecated_props
if deprecated_props_noiface:
updated_forms[form]['deprecated_properties_no_interfaces'] = deprecated_props_noiface
if updated_forms:
changes['updated_forms'] = dict(updated_forms)
return changes
def _compareIfaces(self, curv, oldv, outp: s_output.OutPut) -> dict:
changes = {}
if curv == oldv:
return changes
curfaces = set(curv.keys())
oldfaces = set(oldv.keys())
new_faces = curfaces - oldfaces
del_faces = oldfaces - curfaces # This should generally not happen...
assert len(del_faces) == 0, 'An interface was removed from the data model!'
if new_faces:
nv = {}
for iface in new_faces:
k = copy.deepcopy(curv.get(iface))
# Rewrite props into a dictionary for easier lookup later
k['props'] = {item[0]: {'type': item[1], 'props': item[2]} for item in k['props']}
nv[iface] = k
changes['new_interfaces'] = nv
updated_interfaces = collections.defaultdict(dict)
for iface, curinfo in curv.items():
if iface in new_faces:
continue
oldinfo = oldv.get(iface)
# Did the interface inheritance change?
if curinfo.get('interfaces') != oldinfo.get('interfaces'):
updated_interfaces[iface] = {'updated_interfaces': {'curv': curinfo.get('interfaces'),
'oldv': oldinfo.get('interfaces')}}
# Did the interface have a property definition change?
nprops = curinfo.get('props')
oprops = oldinfo.get('props')
# Convert props to dictionary
nprops = {item[0]: {'type': item[1], 'props': item[2]} for item in nprops}
oprops = {item[0]: {'type': item[1], 'props': item[2]} for item in oprops}
new_props = set(nprops.keys()) - set(oprops.keys())
del_props = set(oprops.keys()) - set(nprops.keys()) # This should generally not happen...
assert len(del_props) == 0, f'A prop was removed from the iface {iface}'
if new_props:
updated_interfaces[iface]['new_properties'] = {prop: nprops.get(prop) for prop in new_props}
updated_props = {}
deprecated_props = {}
for prop, cpinfo in nprops.items():
if prop in new_props:
continue
opinfo = oprops.get(prop)
if cpinfo == opinfo:
continue
if cpinfo.get('props').get('deprecated') and not opinfo.get('props').get('deprecated'):
deprecated_props[prop] = cpinfo
continue
okeys = set(opinfo.keys())
nkeys = set(cpinfo.keys())
if nkeys - okeys:
# We've added a key to the prop def.
raise s_exc.NoSuchImpl(mesg='Have not implemented support for a prop def having a key added')
if okeys - nkeys:
# We've removed a key from the prop def.
updated_props[prop] = {'type': 'delkey', 'keys': list(okeys - nkeys)}
continue
# Check if type change happened, we'll want to document that.
ctyp = cpinfo.get('type')
otyp = opinfo.get('type')
if ctyp == otyp:
continue
updated_props[prop] = {'type': 'type_change', 'new_type': ctyp, 'old_type': otyp}
if updated_props:
updated_interfaces[iface]['updated_properties'] = updated_props
if deprecated_props:
updated_interfaces[iface]['deprecated_properties'] = deprecated_props
changes['updated_interfaces'] = dict(updated_interfaces)
return changes
def _compareTagprops(self, curv, oldv, outp: s_output.OutPut) -> dict:
changes = {}
if curv == oldv:
return changes
raise NotImplementedError('_compareTagprops')
def _compareTypes(self, curv, oldv, outp: s_output.OutPut) -> dict:
changes = {}
if curv == oldv:
return changes
curtypes = set(curv.keys())
oldtypes = set(oldv.keys())
new_types = curtypes - oldtypes
del_types = oldtypes - curtypes # This should generally not happen...
assert len(del_types) == 0, 'A type was removed from the data model!'
if new_types:
changes['new_types'] = {k: curv.get(k) for k in new_types}
updated_types = collections.defaultdict(dict)
deprecated_types = collections.defaultdict(dict)
for _type, curinfo in curv.items():
if _type in new_types:
continue
oldinfo = oldv.get(_type)
if curinfo == oldinfo:
continue
cnfo = curinfo.get('info')
onfo = oldinfo.get('info')
if cnfo.get('deprecated') and not onfo.get('deprecated'):
deprecated_types[_type] = curinfo
continue
if cnfo.get('interfaces') != onfo.get('interfaces'):
updated_types[_type]['updated_interfaces'] = {'curv': cnfo.get('interfaces'),
'oldv': onfo.get('interfaces'),
}
if curinfo.get('opts') != oldinfo.get('opts'):
updated_types[_type]['updated_opts'] = {'curv': curinfo.get('opts'),
'oldv': oldinfo.get('opts'),
}
if updated_types:
changes['updated_types'] = dict(updated_types)
if deprecated_types:
changes['deprecated_types'] = dict(deprecated_types)
return changes
def _compareUnivs(self, curv, oldv, outp: s_output.OutPut) -> dict:
changes = {}
if curv == oldv:
return changes
raise NotImplementedError('_compareUnivs')
[docs]
def diffModl(self, outp: s_output.OutPut) -> dict | None:
if self.changes:
return self.changes
# These are order sensitive due to interface knowledge being required in order
# to deconflict downstream changes on forms.
known_keys = {
'interfaces': self._compareIfaces,
'types': self._compareTypes,
'forms': self._compareForms,
'tagprops': self._compareTagprops,
'edges': self._compareEdges,
'univs': self._compareUnivs,
}
all_keys = set(self.cur_model.keys()).union(self.ref_model.keys())
for key, func in known_keys.items():
self.changes[key] = func(self.cur_model.get(key),
self.ref_model.get(key),
outp)
all_keys.remove(key)
if all_keys:
outp.printf(f'ERROR: Unknown model key found: {all_keys}')
return
return self.changes
def _getModelFile(fp: str) -> dict | None:
with s_common.genfile(fp) as fd:
bytz = fd.read()
large_bytz = gzip.decompress(bytz)
ref_modl = s_common.yamlloads(large_bytz)
return ref_modl
[docs]
async def gen(opts: argparse.Namespace,
outp: s_output.OutPut):
name = opts.name
if name is None:
name = f'{s_common.guid()}.yaml'
fp = s_common.genpath(opts.cdir, name)
data = dict(defstruct)
data['type'] = opts.type
data['desc'] = opts.desc
if opts.pr:
data['prs'] = [opts.pr]
if opts.verbose:
outp.printf('Validating data against schema')
s_schemas._reqChanglogSchema(data)
if opts.verbose:
outp.printf('Saving the following information:')
outp.printf(s_common.yamldump(data).decode())
s_common.yamlsave(data, fp)
outp.printf(f'Saved changelog entry to {fp=}')
if opts.add:
if opts.verbose:
outp.printf('Adding file to git staging')
argv = ['git', 'add', fp]
ret = subprocess.run(argv, capture_output=True)
if opts.verbose:
outp.printf(f'stddout={ret.stdout}')
outp.printf(f'stderr={ret.stderr}')
ret.check_returncode()
return 0
def _gen_model_rst(version, model_ref, changes, current_model, outp: s_output.OutPut, width=80) -> s_autodoc.RstHelp:
rst = s_autodoc.RstHelp()
rst.addHead(f'{version} Model Updates', link=f'.. _{model_ref}:')
rst.addLines(f'The following model updates were made during the ``{version}`` Synapse release.')
if new_interfaces := changes.get('interfaces').get('new_interfaces'):
rst.addHead('New Interfaces', lvl=1)
for interface, info in new_interfaces.items():
rst.addLines(f'``{interface}``')
rst.addLines(*textwrap.wrap(info.get('doc'), initial_indent=' ', subsequent_indent=' ',
width=width))
rst.addLines('\n')
# Deconflict new_forms vs new_types -> do not add types which appear in new_forms.
new_forms = changes.get('forms').get('new_forms', {})
new_types = changes.get('types').get('new_types', {})
types_to_document = {k: v for k, v in new_types.items() if k not in new_forms}
if types_to_document:
rst.addHead('New Types', lvl=1)
for _type, info in types_to_document.items():
rst.addLines(f'``{_type}``')
rst.addLines(*textwrap.wrap(info.get('info').get('doc'), initial_indent=' ', subsequent_indent=' ',
width=width))
rst.addLines('\n')
if new_forms:
rst.addHead('New Forms', lvl=1)
for form, info in new_forms.items():
rst.addLines(f'``{form}``')
# Pull the form doc from the current model directly. In the event of an existing
# type being turned into a form + then reindexed, it would not show up in the
# type diff, so we can't rely on the doc being present there.
doc = current_model.get('types').get(form).get('info').get('doc')
rst.addLines(*textwrap.wrap(doc, initial_indent=' ', subsequent_indent=' ',
width=width))
rst.addLines('\n')
# Check for new properties
updated_forms = changes.get('forms').get('updated_forms', {})
new_props = []
for form, info in updated_forms.items():
if 'new_properties' in info:
new_props.append((form, info))
if new_props:
rst.addHead('New Properties', lvl=1)
new_props.sort(key=lambda x: x[0])
for form, info in new_props:
rst.addLines(f'``{form}``')
new_form_props = list(info.get('new_properties').items())
if len(new_form_props) > 1:
rst.addLines(' The form had the following properties added to it:', '\n')
new_form_props.sort(key=lambda x: x[0])
for name, info in new_form_props:
lines = [
f' ``{name}``',
*textwrap.wrap(info.get('doc'), initial_indent=' ', subsequent_indent=' ',
width=width),
'\n'
]
rst.addLines(*lines)
else:
name, info = new_form_props[0]
lines = [
' The form had the following property added to it:',
'\n'
f' ``{name}``',
*textwrap.wrap(info.get('doc'), initial_indent=' ', subsequent_indent=' ',
width=width),
'\n'
]
rst.addLines(*lines)
# Updated interfaces
if updated_interfaces := changes.get('interfaces').get('updated_interfaces', {}):
upd_ifaces = list(updated_interfaces.items())
upd_ifaces.sort(key=lambda x: x[0])
rst.addHead('Updated Interfaces', lvl=1)
for iface, info in upd_ifaces:
lines = [f'``{iface}``',
]
for key, valu in sorted(info.items(), key=lambda x: x[0]):
if key == 'deprecated_properties':
for prop, pnfo in sorted(valu.items(), key=lambda x: x[0]):
mesg = f'The interface property ``{prop}`` has been deprecated.'
lines.extend(textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ',
width=width))
lines.append('\n')
elif key == 'new_properties':
for prop, pnfo in sorted(valu.items(), key=lambda x: x[0]):
mesg = f'The property ``{prop}`` has been added to the interface.'
lines.extend(textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ',
width=width))
lines.append('\n')
elif key == 'updated_properties':
for prop, pnfo in sorted(valu.items(), key=lambda x: x[0]):
ptyp = pnfo.get('type')
if ptyp == 'type_change':
mesg = f'The property ``{prop}`` has been modified from {pnfo.get("old_type")}' \
f' to {pnfo.get("new_type")}.'
elif ptyp == 'delkey':
mesg = f'The property ``{prop}`` had the ``{pnfo.get("keys")}`` keys removed from its definition.'
else:
raise s_exc.NoSuchImpl(mesg=f'pnfo.type={ptyp} not supported.')
lines.extend(textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ',
width=width))
lines.append('\n')
else: # pragma: no cover
outp.printf(f'Unknown key: {key=} {valu=}')
raise s_exc.SynErr(mesg=f'Unknown updated interface key: {key=} {valu=}')
rst.addLines(*lines)
# Updated types
if updated_types := changes.get('types').get('updated_types', {}):
upd_types = list(updated_types.items())
upd_types.sort(key=lambda x: x[0])
rst.addHead('Updated Types', lvl=1)
for _type, info in upd_types:
lines = [f'``{_type}``',
]
for key, valu in sorted(info.items(), key=lambda x: x[0]):
if key == 'updated_interfaces':
mesg = f'The type interface has been modified from {valu.get("oldv")}' \
f' to {valu.get("curv")}.'
lines.extend(textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ',
width=width))
lines.append('\n')
elif key == 'updated_opts':
mesg = f'The type has been modified from {valu.get("oldv")}' \
f' to {valu.get("curv")}.'
lines.extend(textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ',
width=width))
lines.append('\n')
else: # pragma: no cover
outp.printf(f'Unknown key: {key=} {valu=}')
raise s_exc.SynErr(mesg=f'Unknown updated type key: {key=} {valu=}')
rst.addLines(*lines)
# Updated Forms
# We don't really have a "updated forms" to display since the delta for forms data is really property
# deltas covered elsewhere.
# Updated Properties
upd_props = []
for form, info in updated_forms.items():
if 'updated_properties' in info:
upd_props.append((form, info))
if upd_props:
rst.addHead('Updated Properties', lvl=1)
upd_props.sort(key=lambda x: x[0])
for form, info in upd_props:
rst.addLines(f'``{form}``')
upd_form_props = list(info.get('updated_properties').items())
if len(upd_form_props) > 1:
rst.addLines(' The form had the following properties updated:', '\n')
upd_form_props.sort(key=lambda x: x[0])
for prop, pnfo in upd_form_props:
ptyp = pnfo.get('type')
if ptyp == 'type_change':
mesg = f'The property ``{prop}`` has been modified from {pnfo.get("old_type")}' \
f' to {pnfo.get("new_type")}.'
elif ptyp == 'delkey':
mesg = f'The property ``{prop}`` had the ``{pnfo.get("keys")}`` keys removed from its definition.'
else:
raise s_exc.NoSuchImpl(mesg=f'pnfo.type={ptyp} not supported.')
lines = [
*textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ',
width=width),
'\n'
]
rst.addLines(*lines)
else:
prop, pnfo = upd_form_props[0]
ptyp = pnfo.get('type')
if ptyp == 'type_change':
mesg = f'The property ``{prop}`` has been modified from {pnfo.get("old_type")}' \
f' to {pnfo.get("new_type")}.'
elif ptyp == 'delkey':
mesg = f'The property ``{prop}`` had the ``{pnfo.get("keys")}`` keys removed from its definition.'
else:
raise s_exc.NoSuchImpl(mesg=f'pnfo.type={ptyp} not supported.')
lines = [
' The form had the following property updated:',
'\n',
*textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ',
width=width),
'\n'
]
rst.addLines(*lines)
# Light Edges
if new_edges := changes.get('edges').get('new_edges'):
new_edges = list(new_edges.items())
new_edges.sort(key=lambda x: x[0][1])
rst.addHead('Light Edges', lvl=1)
for (n1, name, n2), info in new_edges:
if n1 is not None and n2 is not None:
mesg = f'''When used with a ``{n1}`` and an ``{n2}`` node, the edge indicates {info.get('doc')}'''
elif n1 is None and n2 is not None:
mesg = f'''When used with a ``{n2}`` target node, the edge indicates {info.get('doc')}'''
elif n1 is not None and n2 is None:
mesg = f'''When used with a ``{n1}`` node, the edge indicates {info.get('doc')}'''
else:
mesg = info.get('doc')
rst.addLines(
f'``{name}``',
*textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ', width=width),
'\n',
)
# Deprecated Interfaces
# TODO Support deprecated interfaces!
# Deprecated Types
# Deconflict deprecated forms vs deprecated_types, so we do not
# not call out types which are also forms in the current model.
deprecated_types = changes.get('types').get('deprecated_types', {})
deprecated_forms = {k: v for k, v in deprecated_types.items() if k in current_model.get('forms')}
deprecated_types = {k: v for k, v in deprecated_types.items() if k not in deprecated_forms}
if deprecated_types:
rst.addHead('Deprecated Types', lvl=1)
rst.addLines('The following types have been marked as deprecated:', '\n')
for _type, info in deprecated_types.items():
rst.addLines(
f'* ``{_type}``',
)
rst.addLines('\n')
# Deprecated Forms
if deprecated_forms:
rst.addHead('Deprecated Types', lvl=1)
rst.addLines('The following forms have been marked as deprecated:', '\n')
for _type, info in deprecated_forms.items():
rst.addLines(
f'* ``{_type}``',
)
rst.addLines('\n')
# Deprecated Properties
dep_props = []
for form, info in updated_forms.items():
if 'deprecated_properties' in info:
dep_props.append((form, info))
if dep_props:
rst.addHead('Deprecated Properties', lvl=1)
dep_props.sort(key=lambda x: x[0])
for form, info in dep_props:
rst.addLines(f'``{form}``')
dep_form_props = list(info.get('deprecated_properties').items())
if len(dep_form_props) > 1:
rst.addLines(' The form had the following properties deprecated:', '\n')
dep_form_props.sort(key=lambda x: x[0])
for name, info in dep_form_props:
lines = [
f' ``{name}``',
*textwrap.wrap(info.get('doc'), initial_indent=' ', subsequent_indent=' ',
width=width),
'\n'
]
rst.addLines(*lines)
else:
name, info = dep_form_props[0]
lines = [
' The form had the following property deprecated:',
'\n'
f' ``{name}``',
*textwrap.wrap(info.get('doc'), initial_indent=' ', subsequent_indent=' ',
width=width),
'\n'
]
rst.addLines(*lines)
if dep_edges := changes.get('edges').get('deprecated_edges'):
rst.addHead('Deprecated Edges', lvl=1)
for (n1, name, n2), info in dep_edges.items():
if n1 is not None and n2 is not None:
mesg = f'''The edge has been deprecated when used with a ``{n1}`` and an ``{n2}`` node. {info.get('doc')}'''
elif n1 is None and n2 is not None:
mesg = f'''The edge has been deprecated when used with a ``{n2}`` target node. {info.get('doc')}'''
elif n1 is not None and n2 is None:
mesg = f'''The edge has been deprecated when used with a ``{n1}`` node. {info.get('doc')}'''
else:
mesg = f'''The edge has been deprecated. {info.get('doc')}'''
rst.addLines(
f'``{name}``',
*textwrap.wrap(mesg, initial_indent=' ', subsequent_indent=' ', width=width),
'\n',
)
return rst
[docs]
async def model(opts: argparse.Namespace,
outp: s_output.OutPut):
if opts.save:
modl = await _getCurrentModl(outp)
dirn = s_common.gendir(opts.cdir, 'modelrefs')
current_commit = _getCurrentCommit(outp)
if not current_commit:
return 1
wrapped_modl = {
'model': modl,
'commit': current_commit,
'version': s_version.version,
}
fp = s_common.genpath(dirn, f'model_{s_version.verstring}_{current_commit}.yaml.gz')
with s_common.genfile(fp) as fd:
fd.truncate(0)
bytz = s_common.yamldump(wrapped_modl)
small_bytz = gzip.compress(bytz)
_ = fd.write(small_bytz)
outp.printf(f'Saved model to {fp}')
return 0
if opts.compare:
ref_modl = _getModelFile(opts.compare)
if opts.to:
to_modl = _getModelFile(opts.to)
modl = to_modl.get('model')
outp.printf(f'Comparing {to_modl.get("version")} - {to_modl.get("commit")} vs {ref_modl.get("version")} - {ref_modl.get("commit")}')
else:
modl = await _getCurrentModl(outp)
outp.printf(f'Comparing current model vs {ref_modl.get("version")} - {ref_modl.get("commit")}')
differ = ModelDiffer(modl, ref_modl.get('model'))
changes = differ.diffModl(outp)
for line in pprint.pformat(changes).splitlines(keepends=False):
outp.printf(line)
return 0
[docs]
async def main(argv, outp=None):
if outp is None:
outp = s_output.OutPut()
pars = makeargparser()
opts = pars.parse_args(argv)
if opts.git_dir_check:
if not os.path.exists(os.path.join(os.getcwd(), '.git')):
outp.printf('Current working directory must be the root of the repository.')
return 1
if opts.verbose:
outp.printf(f'{opts=}')
try:
return await opts.func(opts, outp)
except Exception as e:
outp.printf(f'Error running {opts.func}: {traceback.format_exc()}')
return 1
[docs]
def makeargparser():
desc = '''Command line tool to manage changelog entries.
This tool and any data formats associated with it may change at any time.
'''
pars = argparse.ArgumentParser('synapse.tools.changelog', description=desc)
subpars = pars.add_subparsers(required=True,
title='subcommands',
dest='cmd', )
gen_pars = subpars.add_parser('gen', help='Generate a new changelog entry.')
gen_pars.set_defaults(func=gen)
gen_pars.add_argument('-t', '--type', required=True, choices=list(s_schemas._changelogTypes.keys()),
help='The changelog type.')
gen_pars.add_argument('desc', type=str,
help='The description to populate the initial changelog entry with.', )
gen_pars.add_argument('-p', '--pr', type=int, default=False,
help='PR number associated with the changelog entry.')
gen_pars.add_argument('-a', '--add', default=False, action='store_true',
help='Add the newly created file to the current git staging area.')
# Hidden name override. Mainly for testing.
gen_pars.add_argument('-n', '--name', default=None, type=str,
help=argparse.SUPPRESS)
format_pars = subpars.add_parser('format', help='Format existing files into a RST block.')
format_pars.set_defaults(func=format)
mux_prs = format_pars.add_mutually_exclusive_group()
mux_prs.add_argument('--hide-prs', default=False, action='store_true',
help='Hide PR entries.')
mux_prs.add_argument('--enforce-prs', default=False, action='store_true',
help='Enforce PRs list to be populated with at least one number.', )
format_pars.add_argument('--prs-from-git', default=False, action='store_true',
help='Attempt to populate any PR numbers from a given files commit history.')
format_pars.add_argument('-w', '--width', help='Maximum column width to wrap descriptions at.',
default=79, type=int)
format_pars.add_argument('--version', required=True, action='store', type=str,
help='Version number')
format_pars.add_argument('-d', '--date', action='store', type=str,
help='Date to use with the changelog entry')
format_pars.add_argument('-r', '--rm', default=False, action='store_true',
help='Stage the changelog files as deleted files in git.')
format_pars.add_argument('-m', '--model-ref', default=None, action='store', type=str,
help='Baseline model to use when generating model deltas. This is normally the previous releases model file.')
format_pars.add_argument('--model-current', default=None, action='store',
help='Optional model file to use as a reference as the current model.')
format_pars.add_argument('--model-doc-dir', default=None, action='store',
help='Directory to write the model changes too.')
model_pars = subpars.add_parser('model', help='Helper for working with the Cortex data model.')
model_pars.set_defaults(func=model)
mux_model = model_pars.add_mutually_exclusive_group(required=True)
mux_model.add_argument('-s', '--save', action='store_true', default=False,
help='Save a copy of the current model to a file.')
mux_model.add_argument('-c', '--compare', action='store', default=None,
help='Model to compare the current model against. Useful for debugging modl diff functionality.'
)
model_pars.add_argument('-t', '--to', action='store', default=None,
help='The model file to compare against. Will not use current model if specified.')
for p in (gen_pars, format_pars, model_pars):
p.add_argument('-v', '--verbose', default=False, action='store_true',
help='Enable verbose output')
p.add_argument('--cdir', default='./changes', action='store',
help='Directory of changelog files.')
p.add_argument('--disable-git-dir-check', dest='git_dir_check', default=True, action='store_false',
help=argparse.SUPPRESS)
return pars
if __name__ == '__main__': # pragma: no cover
sys.exit(asyncio.run(main(sys.argv[1:], s_output.stdout)))