import os
import re
import sys
import json
import base64
import pprint
import argparse
import subprocess
import synapse.exc as s_exc
import synapse.data as s_data
import synapse.common as s_common
import synapse.lib.output as s_outp
import synapse.lib.certdir as s_certdir
import cryptography.x509 as c_x509
import cryptography.hazmat.primitives.serialization as c_serialization
[docs]
def checkCosign(outp):
args = ('cosign', 'version')
try:
proc = subprocess.run(args, capture_output=True)
proc.check_returncode()
except (FileNotFoundError, subprocess.CalledProcessError) as e: # pragma: no cover
outp.printf(f'Error calling {" ".join(args)}: {e}')
return False
data = '\n'.join((proc.stdout.decode(), proc.stderr.decode()))
if 'GitVersion' not in data:
outp.printf(f'Cannot find GitVersion in output: {data}')
outp.printf(data)
return False
vline = [line for line in data.splitlines() if line.startswith('GitVersion')][0]
if re.search('v2\\.[0-9]+\\.[0-9]+', vline):
outp.printf(f'Using Cosign with {vline}')
return True
outp.printf(f'Did not find cosign version v2.x.x in "{vline}"')
return False
[docs]
def getCosignSignature(outp, image):
args = ('cosign', 'download', 'signature', image)
try:
proc = subprocess.run(args, capture_output=True)
proc.check_returncode()
except (FileNotFoundError, subprocess.CalledProcessError) as e: # pragma: no cover
outp.printf(f'Error calling {" ".join(args)}: {e}')
return None
blob = proc.stdout
try:
sigd = json.loads(blob)
except json.JSONDecodeError as e:
outp.printf(f'Error decoding blob: {blob}: {e}')
return None
if not isinstance(sigd, dict):
outp.printf(f'Expected dictionary, got {sigd} from {blob}')
return None
return sigd
[docs]
def checkCRL(outp, sigd, certdir):
# Extract signing certificate; ensure that it is not revoked according to our CRL
byts = base64.b64decode(sigd.get('Cert', {}).get('Raw', ''))
try:
cert = c_x509.load_der_x509_certificate(byts)
pem_byts = cert.public_bytes(c_serialization.Encoding.PEM)
except Exception as e: # pragma: no cover
# Unwrap pyopenssl's exception_from_error_queue
outp.printf(f'Failed to load signature bytes: {e} {byts}')
return False
try:
certdir.valCodeCert(pem_byts)
except s_exc.BadCertVerify as e:
mesg = e.get('mesg')
if mesg:
mesg = f'Signature has invalid certificate: {mesg}'
else:
mesg = 'Signature has invalid certificate!'
outp.printf(mesg)
return False
# Return the pubkey bytes in PEM format
return cert.public_key().public_bytes(encoding=c_serialization.Encoding.PEM,
format=c_serialization.PublicFormat.SubjectPublicKeyInfo)
[docs]
def checkCosignSignature(outp, pubk_byts, image_to_verify):
with s_common.getTempDir() as dirn:
# Write certificate out
pubk_path = s_common.genpath(dirn, 'pubkey.pem')
with s_common.genfile(pubk_path) as fd:
fd.write(pubk_byts)
# Do the image verification
args = ('cosign', 'verify', "--rekor-url=''", '--insecure-ignore-sct', '--insecure-ignore-tlog',
'--key', pubk_path, image_to_verify)
try:
proc = subprocess.run(args=args, capture_output=True)
proc.check_returncode()
except subprocess.CalledProcessError as e: # pragma: no cover
outp.printf(f'Error calling {" ".join(args)}: {e}')
return None
blob = json.loads(proc.stdout.decode())
outp.printf(f'Cosign output:')
outp.printf(pprint.pformat(blob))
return True
[docs]
def main(argv, outp=s_outp.stdout): # pragma: no cover
pars = getArgParser()
opts = pars.parse_args(argv)
image_to_verify = opts.image
outp.printf(f'Verifying: {image_to_verify}')
if not checkCosign(outp):
outp.printf('Failed to confirm cosign v2.x.x is available.')
return 1
sigd = getCosignSignature(outp, image_to_verify)
if sigd is None:
outp.printf(f'Failed to get signature for {image_to_verify}')
return 1
if opts.certdir:
certpath = opts.certdir
else:
certpath = s_data.path('certs')
outp.printf(f'Loading certdir from {certpath}')
certdir = s_certdir.CertDir(path=(certpath,))
pubk_byts = checkCRL(outp, sigd, certdir)
if not pubk_byts:
outp.printf(f'CRL check failed for {image_to_verify}')
return 1
outp.printf('Verified certificate embedded in the signature.')
if not checkCosignSignature(outp, pubk_byts, image_to_verify):
outp.printf(f'Failed to verify: {image_to_verify}')
return 1
outp.printf(f'Verified: {image_to_verify}')
return 0
[docs]
def getArgParser(): # pragma: no cover
pars = argparse.ArgumentParser(description='Verify Docker images are signed by The Vertex Project.')
pars.add_argument('--certdir', '-c', action='store', default=None,
help='Alternative certdir to use for signature verification.')
pars.add_argument('image', help="Docker image to verify.")
return pars
if __name__ == '__main__': # pragma: no cover
sys.exit(main(sys.argv[1:]))