| #!/usr/bin/env python3 |
| # Copyright (C) 2020-2021 by the Linux Foundation |
| # SPDX-License-Identifier: MIT-0 |
| import sys |
| import os |
| import base64 |
| import email.utils |
| import re |
| import subprocess |
| import hashlib |
| import urllib.parse |
| import logging |
| |
| import tempfile |
| |
| from typing import Tuple, Optional |
| |
| DEVSIG_HDR = b'X-Developer-Signature' |
| REQ_HDRS = [b'from', b'subject', b'date', b'message-id'] |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def _run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]: |
| sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
| (output, error) = sp.communicate(input=stdin) |
| return sp.returncode, output, error |
| |
| |
| def gpg_run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]: |
| gpgbin = 'gpg' |
| cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs |
| return _run_command(cmdargs, stdin) |
| |
| |
| def check_gpg_status(status: bytes) -> Tuple[bool, bool, bool]: |
| good = False |
| valid = False |
| trusted = False |
| |
| gs_matches = re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M) |
| if gs_matches: |
| good = True |
| vs_matches = re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M) |
| if vs_matches: |
| valid = True |
| ts_matches = re.search(rb'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M) |
| if ts_matches: |
| trusted = True |
| |
| return good, valid, trusted |
| |
| |
| def get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]: |
| with tempfile.TemporaryDirectory(suffix='.git-mailinfo') as td: |
| mf = os.path.join(td, 'm') |
| pf = os.path.join(td, 'p') |
| cmdargs = ['git', 'mailinfo', '--encoding=utf-8', mf, pf] |
| ecode, out, err = _run_command(cmdargs, stdin=payload) |
| if ecode > 0: |
| logger.critical('FAILED : Failed running git-mailinfo:') |
| logger.critical(err.decode()) |
| sys.exit(1) |
| with open(mf, 'rb') as mfh: |
| m = mfh.read() |
| with open(pf, 'rb') as pfh: |
| p = pfh.read() |
| return m, p, out |
| |
| |
| def load_message(msgfile: str) -> Tuple[list, bytes]: |
| # we don't use python's email message because we don't want any processing |
| # done on the contents that may result in a wrong hash being generated |
| headers = list() |
| payload = list() |
| with open(msgfile, 'rb') as fh: |
| logger.info('MSGSRC : %s', msgfile) |
| in_payload = False |
| while True: |
| line = fh.readline() |
| if not line: |
| break |
| # strip any trailing CRLF |
| line = re.sub(rb'[\r\n]*$', b'', line) |
| if in_payload: |
| payload.append(line) |
| continue |
| |
| if not len(line): |
| in_payload = True |
| continue |
| |
| # is it a wrapped header? |
| if line[0] in ("\x09", "\x20", 0x09, 0x20): |
| if not len(headers): |
| # What? |
| logger.critical('Not valid RFC2822 message') |
| sys.exit(1) |
| # attach it to the last header |
| headers[-1] += b'\r\n' + line |
| continue |
| headers.append(line) |
| |
| return headers, b'\r\n'.join(payload) + b'\r\n' |
| |
| |
| def get_mailinfo_message(oheaders: list, opayload: bytes, want_hdrs: list, maxlen: Optional[int]) -> Tuple[list, bytes]: |
| # We pre-canonicalize using git mailinfo |
| origmsg = b'\r\n'.join(oheaders) + b'\r\n\r\n' + opayload |
| m, p, i = get_git_mailinfo(origmsg) |
| # we don't use python's email message because we don't want any processing |
| # done on the contents that may result in a wrong hash being generated |
| # Generate a new payload using m and p and canonicalize with \r\n endings, |
| # trimming any excess blank lines ("simple" DKIM canonicalization). |
| cpayload = b'' |
| for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'): |
| cpayload += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n' |
| |
| if maxlen: |
| logger.debug('Limiting payload length to %d bytes', maxlen) |
| cpayload = cpayload[:maxlen] |
| |
| idata = dict() |
| for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'): |
| left, right = line.split(b':', 1) |
| idata[left.lower()] = right.strip() |
| |
| # Now substituting headers returned by mailinfo |
| cheaders = list() |
| for oheader in oheaders: |
| left, right = oheader.split(b':', 1) |
| lleft = left.lower() |
| if lleft not in want_hdrs: |
| continue |
| if lleft == b'from': |
| right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>' |
| elif lleft == b'subject': |
| right = b' ' + idata.get(b'subject', b'') |
| elif lleft == b'date': |
| right = b' ' + idata.get(b'date', b'') |
| cheaders.append(left + b':' + right) |
| |
| return cheaders, cpayload |
| |
| |
| def splitter(longstr: bytes, limit: int = 78) -> bytes: |
| splitstr = list() |
| first = True |
| while len(longstr) > limit: |
| at = limit |
| if first: |
| first = False |
| at -= 2 |
| splitstr.append(longstr[:at]) |
| longstr = longstr[at:] |
| splitstr.append(longstr) |
| return b' '.join(splitstr) |
| |
| |
| def folder(longhdr: bytes, limit: int = 78) -> bytes: |
| lines = list() |
| line = b'' |
| for chunk in longhdr.split(b' '): |
| if len(line + chunk) > limit: |
| lines.append(line) |
| line = b'' |
| if len(chunk) > limit: |
| # the chunk itself is longer than limit, so append it as-is |
| lines.append(b' ' + chunk) |
| continue |
| if not len(lines) and not len(line): |
| # We're at the very start, so no need to prepend with ' ' |
| line += chunk |
| else: |
| line += b' ' + chunk |
| if len(line): |
| lines.append(line) |
| |
| return b'\r\n'.join(lines) |
| |
| |
| def get_git_toplevel(gitdir: str = None) -> str: |
| cmdargs = ['git'] |
| if gitdir: |
| cmdargs += ['--git-dir', gitdir] |
| cmdargs += ['rev-parse', '--show-toplevel'] |
| ecode, out, err = _run_command(cmdargs) |
| if ecode == 0: |
| return out.decode().strip() |
| return '' |
| |
| |
| def get_parts_from_header(hval: bytes) -> dict: |
| hval = re.sub(rb'\s*', b'', hval) |
| hdata = dict() |
| for chunk in hval.split(b';'): |
| parts = chunk.split(b'=', 1) |
| if len(parts) < 2: |
| continue |
| hdata[parts[0].decode()] = parts[1] |
| return hdata |
| |
| |
| def dkim_canonicalize_header(hval: bytes) -> bytes: |
| # We only do relaxed for headers |
| # o Unfold all header field continuation lines as described in |
| # [RFC5322]; in particular, lines with terminators embedded in |
| # continued header field values (that is, CRLF sequences followed by |
| # WSP) MUST be interpreted without the CRLF. Implementations MUST |
| # NOT remove the CRLF at the end of the header field value. |
| hval = re.sub(rb'[\r\n]', b'', hval) |
| # o Convert all sequences of one or more WSP characters to a single SP |
| # character. WSP characters here include those before and after a |
| # line folding boundary. |
| hval = re.sub(rb'\s+', b' ', hval) |
| # o Delete all WSP characters at the end of each unfolded header field |
| # value. |
| # o Delete any WSP characters remaining before and after the colon |
| # separating the header field name from the header field value. The |
| # colon separator MUST be retained. |
| hval = hval.strip() + b'\r\n' |
| return hval |
| |
| |
| def get_public_key(source: str, keytype: str, identity: str, selector: str) -> Optional[bytes]: |
| chunks = identity.split('@', 1) |
| if len(chunks) != 2: |
| logger.critical('identity must include both local and domain parts') |
| sys.exit(1) |
| local = chunks[0] |
| domain = chunks[1] |
| # urlencode all potentially untrusted bits to make sure nobody tries path-based badness |
| keypath = os.path.join(urllib.parse.quote_plus(keytype), urllib.parse.quote_plus(domain), |
| urllib.parse.quote_plus(local), urllib.parse.quote_plus(selector)) |
| |
| if source.find('ref:') == 0: |
| gittop = get_git_toplevel() |
| if not gittop: |
| logger.critical('Not in a git tree, so cannot use a ref: source') |
| sys.exit(1) |
| # format is: ref:refspec:path |
| # or it could omit the refspec, meaning "whatever the current ref" |
| # but it should always have at least two ":" |
| chunks = source.split(':', 2) |
| if len(chunks) < 3: |
| logger.critical('Invalid source: %s', source) |
| logger.critical('Must have refspec and path, e.g.: ref:refs/heads/master:.keys') |
| # grab the key from a fully ref'ed path |
| ref = chunks[1] |
| pathtop = chunks[2] |
| subpath = os.path.join(pathtop, keypath) |
| |
| if not ref: |
| # What is our current ref? |
| cmdargs = ['git', 'symbolic-ref', 'HEAD'] |
| ecode, out, err = _run_command(cmdargs) |
| if ecode == 0: |
| ref = out.decode().strip() |
| |
| cmdargs = ['git'] |
| cmdargs += ['show', f'{ref}:{subpath}'] |
| ecode, out, err = _run_command(cmdargs) |
| if ecode == 0: |
| logger.info('KEYSRC : %s:%s', ref, subpath) |
| return out |
| # Does it exist on disk in gittop? |
| fullpath = os.path.join(gittop, subpath) |
| if os.path.exists(fullpath): |
| with open(fullpath, 'rb') as fh: |
| logger.info('KEYSRC : %s', fullpath) |
| return fh.read() |
| |
| logger.info('Could not find %s in %s', subpath, ref) |
| # This is not a critical error for PGP |
| return None |
| |
| # It's a direct path, then |
| fullpath = os.path.join(source, keypath) |
| if os.path.exists(fullpath): |
| with open(fullpath, 'rb') as fh: |
| logger.info('Loaded key from %s', fullpath) |
| return fh.read() |
| |
| # This is not a critical error for PGP |
| logger.info('Could not find %s', fullpath) |
| return None |
| |
| |
| def make_devsig_header(headers: list, payload: bytes, algo: str, identity: Optional[str] = None, |
| selector: Optional[str] = None, maxlen: Optional[int] = None, |
| want_hdrs: Optional[list] = None) -> Tuple[bytes, bytes]: |
| if not want_hdrs: |
| want_hdrs = REQ_HDRS |
| cheaders, cpayload = get_mailinfo_message(headers, payload, want_hdrs, maxlen) |
| hashed = hashlib.sha256() |
| hashed.update(cpayload) |
| bh = base64.b64encode(hashed.digest()) |
| hparts = [ |
| b'v=1', |
| b'a=%s-sha256' % algo.encode(), |
| ] |
| if identity: |
| hparts.append(b'i=%s' % identity.encode()) |
| if selector: |
| hparts.append(b's=%s' % selector.encode()) |
| hparts.append(b'h=%s' % b':'.join(want_hdrs)) |
| hparts.append(b'l=%d' % len(cpayload)) |
| hparts.append(b'bh=%s' % bh) |
| hparts.append(b'b=') |
| dshval = b'; '.join(hparts) |
| |
| hashed = hashlib.sha256() |
| for cheader in cheaders: |
| left, right = cheader.split(b':', 1) |
| hname = left.strip().lower() |
| if hname not in want_hdrs: |
| continue |
| |
| hashed.update(hname + b':' + dkim_canonicalize_header(right)) |
| hashed.update(DEVSIG_HDR.lower() + b':' + dshval) |
| dshdr = DEVSIG_HDR + b': ' + dshval |
| |
| return dshdr, hashed.digest() |
| |
| |
| def get_devsig_header_info(headers) -> Tuple[str, str, str, list, dict]: |
| from_hdr = None |
| hdata = None |
| need_hdrs = [b'from', DEVSIG_HDR.lower()] |
| for header in headers: |
| left, right = header.split(b':', 1) |
| hname = left.strip().lower() |
| # We want a "from" header and a DEVSIG_HDR |
| if hname not in need_hdrs: |
| continue |
| if hname == b'from': |
| from_hdr = right |
| continue |
| hval = dkim_canonicalize_header(right) |
| hdata = get_parts_from_header(hval) |
| |
| if hdata is None: |
| logger.critical('FAILED : No "%s:" header in message', DEVSIG_HDR.decode()) |
| sys.exit(1) |
| |
| # make sure the required headers are in the sig |
| if 'h' not in hdata: |
| logger.critical('FAILED : h= is required but is not present in %s', DEVSIG_HDR) |
| sys.exit(1) |
| |
| signed_hdrs = [x.strip() for x in hdata['h'].split(b':')] |
| for rhdr in REQ_HDRS: |
| if rhdr not in signed_hdrs: |
| logger.critical('FAILED : %s is a required header', rhdr.decode()) |
| sys.exit(1) |
| |
| if 'i' not in hdata: |
| # Use the identity from the from header |
| if not from_hdr: |
| logger.critical('FAILED : No i= in %s, and no From: header!', DEVSIG_HDR) |
| sys.exit(1) |
| parts = email.utils.parseaddr(from_hdr.decode()) |
| identity = parts[1] |
| else: |
| identity = hdata['i'] |
| |
| if 'a' in hdata: |
| apart = hdata['a'].decode() |
| if apart.startswith('ed25519'): |
| algo = 'ed25519' |
| elif apart.startswith('openpgp'): |
| algo = 'openpgp' |
| else: |
| logger.critical('FAILED : Unsupported a= in %s: %s', DEVSIG_HDR, apart) |
| sys.exit(1) |
| else: |
| # Default is ed25519-sha256 |
| algo = 'ed25519' |
| |
| if 's' in hdata: |
| selector = hdata['s'].decode() |
| else: |
| selector = 'default' |
| |
| return identity, selector, algo, signed_hdrs, hdata |
| |
| |
| def cmd_sign_ed25519(cmdargs) -> None: |
| from nacl.signing import SigningKey |
| from nacl.encoding import Base64Encoder |
| |
| logger.info('SIGNING : ED25519 using %s', cmdargs.privkey) |
| headers, payload = load_message(cmdargs.message) |
| dshdr, digest = make_devsig_header(headers, payload, algo='ed25519', selector=cmdargs.selector) |
| try: |
| with open(cmdargs.privkey, 'r') as fh: |
| sk = SigningKey(fh.read(), encoder=Base64Encoder) |
| except IOError: |
| logger.critical('Could not open %s', cmdargs.privkey) |
| sys.exit(1) |
| |
| bdata = sk.sign(digest, encoder=Base64Encoder) |
| dshdr = folder(dshdr + splitter(bdata)) |
| headers.append(dshdr) |
| signed = b'\r\n'.join(headers) + b'\r\n\r\n' + payload |
| logger.info('--- SIGNED MESSAGE STARTS ---') |
| sys.stdout.buffer.write(signed) |
| |
| |
| def verify_ed25519(sigdata: bytes, pk: bytes) -> Optional[bytes]: |
| from nacl.signing import VerifyKey |
| from nacl.encoding import Base64Encoder |
| from nacl.exceptions import BadSignatureError |
| |
| vk = VerifyKey(pk, encoder=Base64Encoder) |
| try: |
| return vk.verify(sigdata, encoder=Base64Encoder) |
| except BadSignatureError: |
| return None |
| |
| |
| def cmd_sign_pgp(cmdargs) -> None: |
| logger.info('SIGNING : PGP using %s', cmdargs.usekey) |
| headers, payload = load_message(cmdargs.message) |
| dshdr, digest = make_devsig_header(headers, payload, algo='openpgp', selector=cmdargs.selector) |
| gpgargs = ['-s', '-u', cmdargs.usekey] |
| ecode, out, err = gpg_run_command(gpgargs, digest) |
| if ecode > 0: |
| logger.critical('Running gpg failed') |
| logger.critical(err.decode()) |
| sys.exit(ecode) |
| bdata = base64.b64encode(out) |
| dshdr = folder(dshdr + splitter(bdata)) |
| headers.append(dshdr) |
| signed = b'\r\n'.join(headers) + b'\r\n\r\n' + payload |
| logger.info('--- SIGNED MESSAGE STARTS ---') |
| sys.stdout.buffer.write(signed) |
| |
| |
| def verify_openpgp(sigdata: bytes, pk: Optional[bytes]) -> Optional[bytes]: |
| bsigdata = base64.b64decode(sigdata) |
| vrfyargs = ['--verify', '--output', '-', '--status-fd=2'] |
| if pk is not None: |
| with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring: |
| keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}'] |
| gpgargs = keyringargs + ['--status-fd=1', '--import'] |
| ecode, out, err = gpg_run_command(gpgargs, stdin=pk) |
| # look for IMPORT_OK |
| if out.find(b'[GNUPG:] IMPORT_OK') < 0: |
| logger.critical('Could not import public key!') |
| return None |
| gpgargs = keyringargs + vrfyargs |
| ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata) |
| if ecode > 0: |
| logger.critical('FAILED : Failed to verify PGP signature') |
| return None |
| good, valid, trusted = check_gpg_status(err) |
| if good and valid: |
| return out |
| |
| logger.critical('FAILED : Failed to verify PGP signature') |
| return None |
| |
| logger.info('Verifying using default keyring') |
| ecode, out, err = gpg_run_command(vrfyargs, stdin=bsigdata) |
| |
| if ecode > 0: |
| logger.critical('FAILED : Failed to verify PGP signature') |
| return None |
| |
| good, valid, trusted = check_gpg_status(err) |
| if good and valid: |
| if not trusted: |
| logger.warning('WARNING : Insufficient trust on the key') |
| |
| return out |
| |
| logger.critical('FAILED : Failed to verify PGP signature') |
| return None |
| |
| |
| def cmd_verify(cmdargs): |
| headers, payload = load_message(cmdargs.message) |
| identity, selector, algo, signed_hdrs, hdata = get_devsig_header_info(headers) |
| # Check if we have this private key |
| pk = get_public_key(cmdargs.keypath, algo, identity, selector) |
| |
| sdigest = None |
| if algo == 'ed25519': |
| if not pk: |
| sys.exit(1) |
| sdigest = verify_ed25519(hdata['b'], pk) |
| elif algo == 'openpgp': |
| sdigest = verify_openpgp(hdata['b'], pk) |
| |
| if not sdigest: |
| logger.critical('Faled to verify signature!') |
| sys.exit(1) |
| |
| # Now calculate our own digest and compare |
| dshdr, digest = make_devsig_header(headers, payload, algo, identity=hdata.get('i', b'').decode(), |
| selector=hdata.get('s', b'').decode(), want_hdrs=signed_hdrs) |
| success = False |
| if sdigest != digest: |
| # Try to limit the payload to just the number of bytes specified in the sig header |
| try: |
| maxlen = int(hdata.get('l', b'0')) |
| if maxlen: |
| dshdr, digest = make_devsig_header(headers, payload, algo, identity=hdata.get('i', b''), |
| selector=hdata.get('s', b''), maxlen=maxlen, want_hdrs=signed_hdrs) |
| if sdigest == digest: |
| logger.warning('WARNING : Succeeded after trimming payload; the following content was discarded:') |
| for line in payload[maxlen:].strip().split(b'\n'): |
| sys.stderr.buffer.write(b' : %s\n' % line) |
| success = True |
| except ValueError: |
| pass |
| else: |
| success = True |
| |
| if success: |
| logger.info('SUCCESS : Signature and content hashes verified') |
| return |
| |
| logger.critical('FAILED : Failed to verify signature') |
| sys.exit(1) |
| |
| |
| def cmd_gen_ed25519(cmdargs): |
| from nacl.signing import SigningKey |
| logger.info('Generating: new ED25519 key') |
| newkey = SigningKey.generate() |
| with open(cmdargs.output + '.key', 'wb') as fh: |
| fh.write(base64.b64encode(bytes(newkey))) |
| logger.info('Wrote: %s.key', cmdargs.output) |
| with open(cmdargs.output + '.pub', 'wb') as fh: |
| fh.write(base64.b64encode(bytes(newkey.verify_key))) |
| logger.info('Wrote: %s.pub', cmdargs.output) |
| sys.exit(0) |
| |
| |
| if __name__ == '__main__': |
| import argparse |
| |
| # noinspection PyTypeChecker |
| parser = argparse.ArgumentParser( |
| prog='main', |
| description='A proof of concept tool for header-based email patch attestation', |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| ) |
| parser.add_argument('-m', '--message', default='emails/dev-unsigned.eml', |
| help='File with the message to work with') |
| parser.add_argument('-v', '--verbose', default=False, |
| help='Print extra debugging output') |
| subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd') |
| |
| # sign-pgp |
| sp_spgp = subparsers.add_parser('sign-pgp', help='Sign with PGP key') |
| sp_spgp.add_argument('-k', '--usekey', default='AAAABBBBCCCCDDDD') |
| sp_spgp.add_argument('-s', '--selector') |
| sp_spgp.set_defaults(func=cmd_sign_pgp) |
| |
| # sign-ed25519 |
| sp_sed25519 = subparsers.add_parser('sign-ed25519', help='Sign with an ed25519 key') |
| sp_sed25519.add_argument('-k', '--privkey', default='dev.key') |
| sp_sed25519.add_argument('-s', '--selector') |
| sp_sed25519.set_defaults(func=cmd_sign_ed25519) |
| |
| # gen-ed25519 |
| sp_gened25519 = subparsers.add_parser('gen-ed25519', help='Generate an ed25519 keypair') |
| sp_gened25519.add_argument('-o', '--output', default='new_ed25519') |
| sp_gened25519.set_defaults(func=cmd_gen_ed25519) |
| |
| # verify |
| sp_verify = subparsers.add_parser('verify', help='Verify a signed message') |
| sp_verify.add_argument('-p', '--keypath', default='ref:refs/heads/master:.keys') |
| sp_verify.set_defaults(func=cmd_verify) |
| |
| args = parser.parse_args() |
| |
| logger.setLevel(logging.DEBUG) |
| ch = logging.StreamHandler() |
| formatter = logging.Formatter('%(message)s') |
| ch.setFormatter(formatter) |
| if args.verbose: |
| ch.setLevel(logging.DEBUG) |
| else: |
| ch.setLevel(logging.INFO) |
| logger.addHandler(ch) |
| |
| if 'func' not in args: |
| parser.print_help() |
| sys.exit(1) |
| |
| args.func(args) |