blob: 2e732e32fdaf46c1ff00112d5394b738f8f8ed1f [file] [log] [blame]
#!/usr/bin/env python3
# Copyright (C) 2020-2021 by the Linux Foundation
# SPDX-License-Identifier: MIT-0
import sys
import os
import base64
import email.utils
import re
import subprocess
import hashlib
import urllib.parse
import logging
import tempfile
from typing import Tuple, Optional
DEVSIG_HDR = b'X-Developer-Signature'
REQ_HDRS = [b'from', b'subject', b'date', b'message-id']
logger = logging.getLogger(__name__)
def _run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]:
sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
(output, error) = sp.communicate(input=stdin)
return sp.returncode, output, error
def gpg_run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]:
gpgbin = 'gpg'
cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs
return _run_command(cmdargs, stdin)
def check_gpg_status(status: bytes) -> Tuple[bool, bool, bool]:
good = False
valid = False
trusted = False
gs_matches = re.search(rb'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M)
if gs_matches:
good = True
vs_matches = re.search(rb'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)
if vs_matches:
valid = True
ts_matches = re.search(rb'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M)
if ts_matches:
trusted = True
return good, valid, trusted
def get_git_mailinfo(payload: bytes) -> Tuple[bytes, bytes, bytes]:
with tempfile.TemporaryDirectory(suffix='.git-mailinfo') as td:
mf = os.path.join(td, 'm')
pf = os.path.join(td, 'p')
cmdargs = ['git', 'mailinfo', '--encoding=utf-8', mf, pf]
ecode, out, err = _run_command(cmdargs, stdin=payload)
if ecode > 0:
logger.critical('FAILED : Failed running git-mailinfo:')
logger.critical(err.decode())
sys.exit(1)
with open(mf, 'rb') as mfh:
m = mfh.read()
with open(pf, 'rb') as pfh:
p = pfh.read()
return m, p, out
def load_message(msgfile: str) -> Tuple[list, bytes]:
# we don't use python's email message because we don't want any processing
# done on the contents that may result in a wrong hash being generated
headers = list()
payload = list()
with open(msgfile, 'rb') as fh:
logger.info('MSGSRC : %s', msgfile)
in_payload = False
while True:
line = fh.readline()
if not line:
break
# strip any trailing CRLF
line = re.sub(rb'[\r\n]*$', b'', line)
if in_payload:
payload.append(line)
continue
if not len(line):
in_payload = True
continue
# is it a wrapped header?
if line[0] in ("\x09", "\x20", 0x09, 0x20):
if not len(headers):
# What?
logger.critical('Not valid RFC2822 message')
sys.exit(1)
# attach it to the last header
headers[-1] += b'\r\n' + line
continue
headers.append(line)
return headers, b'\r\n'.join(payload) + b'\r\n'
def get_mailinfo_message(oheaders: list, opayload: bytes, want_hdrs: list, maxlen: Optional[int]) -> Tuple[list, bytes]:
# We pre-canonicalize using git mailinfo
origmsg = b'\r\n'.join(oheaders) + b'\r\n\r\n' + opayload
m, p, i = get_git_mailinfo(origmsg)
# we don't use python's email message because we don't want any processing
# done on the contents that may result in a wrong hash being generated
# Generate a new payload using m and p and canonicalize with \r\n endings,
# trimming any excess blank lines ("simple" DKIM canonicalization).
cpayload = b''
for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
cpayload += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
if maxlen:
logger.debug('Limiting payload length to %d bytes', maxlen)
cpayload = cpayload[:maxlen]
idata = dict()
for line in re.sub(rb'[\r\n]*$', b'', i).split(b'\n'):
left, right = line.split(b':', 1)
idata[left.lower()] = right.strip()
# Now substituting headers returned by mailinfo
cheaders = list()
for oheader in oheaders:
left, right = oheader.split(b':', 1)
lleft = left.lower()
if lleft not in want_hdrs:
continue
if lleft == b'from':
right = b' ' + idata.get(b'author', b'') + b' <' + idata.get(b'email', b'') + b'>'
elif lleft == b'subject':
right = b' ' + idata.get(b'subject', b'')
elif lleft == b'date':
right = b' ' + idata.get(b'date', b'')
cheaders.append(left + b':' + right)
return cheaders, cpayload
def splitter(longstr: bytes, limit: int = 78) -> bytes:
splitstr = list()
first = True
while len(longstr) > limit:
at = limit
if first:
first = False
at -= 2
splitstr.append(longstr[:at])
longstr = longstr[at:]
splitstr.append(longstr)
return b' '.join(splitstr)
def folder(longhdr: bytes, limit: int = 78) -> bytes:
lines = list()
line = b''
for chunk in longhdr.split(b' '):
if len(line + chunk) > limit:
lines.append(line)
line = b''
if len(chunk) > limit:
# the chunk itself is longer than limit, so append it as-is
lines.append(b' ' + chunk)
continue
if not len(lines) and not len(line):
# We're at the very start, so no need to prepend with ' '
line += chunk
else:
line += b' ' + chunk
if len(line):
lines.append(line)
return b'\r\n'.join(lines)
def get_git_toplevel(gitdir: str = None) -> str:
cmdargs = ['git']
if gitdir:
cmdargs += ['--git-dir', gitdir]
cmdargs += ['rev-parse', '--show-toplevel']
ecode, out, err = _run_command(cmdargs)
if ecode == 0:
return out.decode().strip()
return ''
def get_parts_from_header(hval: bytes) -> dict:
hval = re.sub(rb'\s*', b'', hval)
hdata = dict()
for chunk in hval.split(b';'):
parts = chunk.split(b'=', 1)
if len(parts) < 2:
continue
hdata[parts[0].decode()] = parts[1]
return hdata
def dkim_canonicalize_header(hval: bytes) -> bytes:
# We only do relaxed for headers
# o Unfold all header field continuation lines as described in
# [RFC5322]; in particular, lines with terminators embedded in
# continued header field values (that is, CRLF sequences followed by
# WSP) MUST be interpreted without the CRLF. Implementations MUST
# NOT remove the CRLF at the end of the header field value.
hval = re.sub(rb'[\r\n]', b'', hval)
# o Convert all sequences of one or more WSP characters to a single SP
# character. WSP characters here include those before and after a
# line folding boundary.
hval = re.sub(rb'\s+', b' ', hval)
# o Delete all WSP characters at the end of each unfolded header field
# value.
# o Delete any WSP characters remaining before and after the colon
# separating the header field name from the header field value. The
# colon separator MUST be retained.
hval = hval.strip() + b'\r\n'
return hval
def get_public_key(source: str, keytype: str, identity: str, selector: str) -> Optional[bytes]:
chunks = identity.split('@', 1)
if len(chunks) != 2:
logger.critical('identity must include both local and domain parts')
sys.exit(1)
local = chunks[0]
domain = chunks[1]
# urlencode all potentially untrusted bits to make sure nobody tries path-based badness
keypath = os.path.join(urllib.parse.quote_plus(keytype), urllib.parse.quote_plus(domain),
urllib.parse.quote_plus(local), urllib.parse.quote_plus(selector))
if source.find('ref:') == 0:
gittop = get_git_toplevel()
if not gittop:
logger.critical('Not in a git tree, so cannot use a ref: source')
sys.exit(1)
# format is: ref:refspec:path
# or it could omit the refspec, meaning "whatever the current ref"
# but it should always have at least two ":"
chunks = source.split(':', 2)
if len(chunks) < 3:
logger.critical('Invalid source: %s', source)
logger.critical('Must have refspec and path, e.g.: ref:refs/heads/master:.keys')
# grab the key from a fully ref'ed path
ref = chunks[1]
pathtop = chunks[2]
subpath = os.path.join(pathtop, keypath)
if not ref:
# What is our current ref?
cmdargs = ['git', 'symbolic-ref', 'HEAD']
ecode, out, err = _run_command(cmdargs)
if ecode == 0:
ref = out.decode().strip()
cmdargs = ['git']
cmdargs += ['show', f'{ref}:{subpath}']
ecode, out, err = _run_command(cmdargs)
if ecode == 0:
logger.info('KEYSRC : %s:%s', ref, subpath)
return out
# Does it exist on disk in gittop?
fullpath = os.path.join(gittop, subpath)
if os.path.exists(fullpath):
with open(fullpath, 'rb') as fh:
logger.info('KEYSRC : %s', fullpath)
return fh.read()
logger.info('Could not find %s in %s', subpath, ref)
# This is not a critical error for PGP
return None
# It's a direct path, then
fullpath = os.path.join(source, keypath)
if os.path.exists(fullpath):
with open(fullpath, 'rb') as fh:
logger.info('Loaded key from %s', fullpath)
return fh.read()
# This is not a critical error for PGP
logger.info('Could not find %s', fullpath)
return None
def make_devsig_header(headers: list, payload: bytes, algo: str, identity: Optional[str] = None,
selector: Optional[str] = None, maxlen: Optional[int] = None,
want_hdrs: Optional[list] = None) -> Tuple[bytes, bytes]:
if not want_hdrs:
want_hdrs = REQ_HDRS
cheaders, cpayload = get_mailinfo_message(headers, payload, want_hdrs, maxlen)
hashed = hashlib.sha256()
hashed.update(cpayload)
bh = base64.b64encode(hashed.digest())
hparts = [
b'v=1',
b'a=%s-sha256' % algo.encode(),
]
if identity:
hparts.append(b'i=%s' % identity.encode())
if selector:
hparts.append(b's=%s' % selector.encode())
hparts.append(b'h=%s' % b':'.join(want_hdrs))
hparts.append(b'l=%d' % len(cpayload))
hparts.append(b'bh=%s' % bh)
hparts.append(b'b=')
dshval = b'; '.join(hparts)
hashed = hashlib.sha256()
for cheader in cheaders:
left, right = cheader.split(b':', 1)
hname = left.strip().lower()
if hname not in want_hdrs:
continue
hashed.update(hname + b':' + dkim_canonicalize_header(right))
hashed.update(DEVSIG_HDR.lower() + b':' + dshval)
dshdr = DEVSIG_HDR + b': ' + dshval
return dshdr, hashed.digest()
def get_devsig_header_info(headers) -> Tuple[str, str, str, list, dict]:
from_hdr = None
hdata = None
need_hdrs = [b'from', DEVSIG_HDR.lower()]
for header in headers:
left, right = header.split(b':', 1)
hname = left.strip().lower()
# We want a "from" header and a DEVSIG_HDR
if hname not in need_hdrs:
continue
if hname == b'from':
from_hdr = right
continue
hval = dkim_canonicalize_header(right)
hdata = get_parts_from_header(hval)
if hdata is None:
logger.critical('FAILED : No "%s:" header in message', DEVSIG_HDR.decode())
sys.exit(1)
# make sure the required headers are in the sig
if 'h' not in hdata:
logger.critical('FAILED : h= is required but is not present in %s', DEVSIG_HDR)
sys.exit(1)
signed_hdrs = [x.strip() for x in hdata['h'].split(b':')]
for rhdr in REQ_HDRS:
if rhdr not in signed_hdrs:
logger.critical('FAILED : %s is a required header', rhdr.decode())
sys.exit(1)
if 'i' not in hdata:
# Use the identity from the from header
if not from_hdr:
logger.critical('FAILED : No i= in %s, and no From: header!', DEVSIG_HDR)
sys.exit(1)
parts = email.utils.parseaddr(from_hdr.decode())
identity = parts[1]
else:
identity = hdata['i']
if 'a' in hdata:
apart = hdata['a'].decode()
if apart.startswith('ed25519'):
algo = 'ed25519'
elif apart.startswith('openpgp'):
algo = 'openpgp'
else:
logger.critical('FAILED : Unsupported a= in %s: %s', DEVSIG_HDR, apart)
sys.exit(1)
else:
# Default is ed25519-sha256
algo = 'ed25519'
if 's' in hdata:
selector = hdata['s'].decode()
else:
selector = 'default'
return identity, selector, algo, signed_hdrs, hdata
def cmd_sign_ed25519(cmdargs) -> None:
from nacl.signing import SigningKey
from nacl.encoding import Base64Encoder
logger.info('SIGNING : ED25519 using %s', cmdargs.privkey)
headers, payload = load_message(cmdargs.message)
dshdr, digest = make_devsig_header(headers, payload, algo='ed25519', selector=cmdargs.selector)
try:
with open(cmdargs.privkey, 'r') as fh:
sk = SigningKey(fh.read(), encoder=Base64Encoder)
except IOError:
logger.critical('Could not open %s', cmdargs.privkey)
sys.exit(1)
bdata = sk.sign(digest, encoder=Base64Encoder)
dshdr = folder(dshdr + splitter(bdata))
headers.append(dshdr)
signed = b'\r\n'.join(headers) + b'\r\n\r\n' + payload
logger.info('--- SIGNED MESSAGE STARTS ---')
sys.stdout.buffer.write(signed)
def verify_ed25519(sigdata: bytes, pk: bytes) -> Optional[bytes]:
from nacl.signing import VerifyKey
from nacl.encoding import Base64Encoder
from nacl.exceptions import BadSignatureError
vk = VerifyKey(pk, encoder=Base64Encoder)
try:
return vk.verify(sigdata, encoder=Base64Encoder)
except BadSignatureError:
return None
def cmd_sign_pgp(cmdargs) -> None:
logger.info('SIGNING : PGP using %s', cmdargs.usekey)
headers, payload = load_message(cmdargs.message)
dshdr, digest = make_devsig_header(headers, payload, algo='openpgp', selector=cmdargs.selector)
gpgargs = ['-s', '-u', cmdargs.usekey]
ecode, out, err = gpg_run_command(gpgargs, digest)
if ecode > 0:
logger.critical('Running gpg failed')
logger.critical(err.decode())
sys.exit(ecode)
bdata = base64.b64encode(out)
dshdr = folder(dshdr + splitter(bdata))
headers.append(dshdr)
signed = b'\r\n'.join(headers) + b'\r\n\r\n' + payload
logger.info('--- SIGNED MESSAGE STARTS ---')
sys.stdout.buffer.write(signed)
def verify_openpgp(sigdata: bytes, pk: Optional[bytes]) -> Optional[bytes]:
bsigdata = base64.b64decode(sigdata)
vrfyargs = ['--verify', '--output', '-', '--status-fd=2']
if pk is not None:
with tempfile.TemporaryFile(suffix='.patch-attest-poc') as temp_keyring:
keyringargs = ['--no-default-keyring', f'--keyring={temp_keyring}']
gpgargs = keyringargs + ['--status-fd=1', '--import']
ecode, out, err = gpg_run_command(gpgargs, stdin=pk)
# look for IMPORT_OK
if out.find(b'[GNUPG:] IMPORT_OK') < 0:
logger.critical('Could not import public key!')
return None
gpgargs = keyringargs + vrfyargs
ecode, out, err = gpg_run_command(gpgargs, stdin=bsigdata)
if ecode > 0:
logger.critical('FAILED : Failed to verify PGP signature')
return None
good, valid, trusted = check_gpg_status(err)
if good and valid:
return out
logger.critical('FAILED : Failed to verify PGP signature')
return None
logger.info('Verifying using default keyring')
ecode, out, err = gpg_run_command(vrfyargs, stdin=bsigdata)
if ecode > 0:
logger.critical('FAILED : Failed to verify PGP signature')
return None
good, valid, trusted = check_gpg_status(err)
if good and valid:
if not trusted:
logger.warning('WARNING : Insufficient trust on the key')
return out
logger.critical('FAILED : Failed to verify PGP signature')
return None
def cmd_verify(cmdargs):
headers, payload = load_message(cmdargs.message)
identity, selector, algo, signed_hdrs, hdata = get_devsig_header_info(headers)
# Check if we have this private key
pk = get_public_key(cmdargs.keypath, algo, identity, selector)
sdigest = None
if algo == 'ed25519':
if not pk:
sys.exit(1)
sdigest = verify_ed25519(hdata['b'], pk)
elif algo == 'openpgp':
sdigest = verify_openpgp(hdata['b'], pk)
if not sdigest:
logger.critical('Faled to verify signature!')
sys.exit(1)
# Now calculate our own digest and compare
dshdr, digest = make_devsig_header(headers, payload, algo, identity=hdata.get('i', b'').decode(),
selector=hdata.get('s', b'').decode(), want_hdrs=signed_hdrs)
success = False
if sdigest != digest:
# Try to limit the payload to just the number of bytes specified in the sig header
try:
maxlen = int(hdata.get('l', b'0'))
if maxlen:
dshdr, digest = make_devsig_header(headers, payload, algo, identity=hdata.get('i', b''),
selector=hdata.get('s', b''), maxlen=maxlen, want_hdrs=signed_hdrs)
if sdigest == digest:
logger.warning('WARNING : Succeeded after trimming payload; the following content was discarded:')
for line in payload[maxlen:].strip().split(b'\n'):
sys.stderr.buffer.write(b' : %s\n' % line)
success = True
except ValueError:
pass
else:
success = True
if success:
logger.info('SUCCESS : Signature and content hashes verified')
return
logger.critical('FAILED : Failed to verify signature')
sys.exit(1)
def cmd_gen_ed25519(cmdargs):
from nacl.signing import SigningKey
logger.info('Generating: new ED25519 key')
newkey = SigningKey.generate()
with open(cmdargs.output + '.key', 'wb') as fh:
fh.write(base64.b64encode(bytes(newkey)))
logger.info('Wrote: %s.key', cmdargs.output)
with open(cmdargs.output + '.pub', 'wb') as fh:
fh.write(base64.b64encode(bytes(newkey.verify_key)))
logger.info('Wrote: %s.pub', cmdargs.output)
sys.exit(0)
if __name__ == '__main__':
import argparse
# noinspection PyTypeChecker
parser = argparse.ArgumentParser(
prog='main',
description='A proof of concept tool for header-based email patch attestation',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('-m', '--message', default='emails/dev-unsigned.eml',
help='File with the message to work with')
parser.add_argument('-v', '--verbose', default=False,
help='Print extra debugging output')
subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd')
# sign-pgp
sp_spgp = subparsers.add_parser('sign-pgp', help='Sign with PGP key')
sp_spgp.add_argument('-k', '--usekey', default='AAAABBBBCCCCDDDD')
sp_spgp.add_argument('-s', '--selector')
sp_spgp.set_defaults(func=cmd_sign_pgp)
# sign-ed25519
sp_sed25519 = subparsers.add_parser('sign-ed25519', help='Sign with an ed25519 key')
sp_sed25519.add_argument('-k', '--privkey', default='dev.key')
sp_sed25519.add_argument('-s', '--selector')
sp_sed25519.set_defaults(func=cmd_sign_ed25519)
# gen-ed25519
sp_gened25519 = subparsers.add_parser('gen-ed25519', help='Generate an ed25519 keypair')
sp_gened25519.add_argument('-o', '--output', default='new_ed25519')
sp_gened25519.set_defaults(func=cmd_gen_ed25519)
# verify
sp_verify = subparsers.add_parser('verify', help='Verify a signed message')
sp_verify.add_argument('-p', '--keypath', default='ref:refs/heads/master:.keys')
sp_verify.set_defaults(func=cmd_verify)
args = parser.parse_args()
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
if args.verbose:
ch.setLevel(logging.DEBUG)
else:
ch.setLevel(logging.INFO)
logger.addHandler(ch)
if 'func' not in args:
parser.print_help()
sys.exit(1)
args.func(args)