blob: 2807be0bdcf58015b13bb8becc561c37bf92b645 [file] [log] [blame]
#!/usr/bin/env python3
import sys
import os
import base64
import email
import email.header
import email.utils
import b4
import time
import dkim
import re
import dns.resolver
import binascii
import subprocess
import hashlib
import requests
import anybase32
import urllib.parse
import logging
from nacl.signing import SigningKey, VerifyKey
from nacl.encoding import Base64Encoder
from nacl.exceptions import BadSignatureError
from tempfile import mkstemp
from Cryptodome.Signature import pkcs1_15
from Cryptodome.Hash import SHA256
from Cryptodome.PublicKey import RSA
from typing import Tuple
XPH_HDR = 'X-Patch-Hashes'
XPS_HDR = 'X-Patch-Sig'
logger = logging.getLogger(__name__)
def _run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]:
sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
(output, error) = sp.communicate(input=stdin)
return sp.returncode, output, error
def gpg_run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]:
gpgbin = 'gpg'
cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs
return _run_command(cmdargs, stdin)
def load_message(msgfile: str):
with open(msgfile, 'rb') as fh:
logger.info('Using %s as message source', msgfile)
contents = fh.read()
return email.message_from_bytes(contents)
def splitter(longstr: str, limit: int = 77) -> str:
splitstr = list()
first = True
while len(longstr) > limit:
at = limit
if first:
first = False
at -= 2
splitstr.append(longstr[:at])
longstr = longstr[at:]
splitstr.append(longstr)
return ' '.join(splitstr)
def verify_identity_domain(msg, identity: str, domain: str) -> bool:
# Domain is supposed to be present in identity
if not identity.endswith(domain):
logger.critical('FAIL : domain (d=%s) is not in identity (i=%s)', domain, identity)
return False
fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1]
if identity.find('@') < 0:
logger.critical('FAIL : identity must contain @ (i=%s)', identity)
return False
ilocal, idomain = identity.split('@')
# identity is supposed to be present in from
if not fromeml.endswith(f'@{idomain}'):
logger.critical('FAIL : identity (i=%s) does not match from (from=%s)', identity, fromeml)
return False
logger.info('PASS : identity and domain match From header')
return True
def get_dkim_key(domain: str, selector: str, wantkey: str = 'rsa', timeout: int = 5) -> str:
name = f'{selector}._domainkey.{domain}.'
logger.info('DNS-lookup: %s', name)
keydata = None
try:
a = dns.resolver.resolve(name, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout)
# Find v=DKIM1
for r in a.response.answer:
for item in r.items:
for s in item.strings:
if s.find(b'v=DKIM1') >= 0:
keydata = s.decode()
if keydata.find(wantkey) >= 0:
break
keydata = None
if keydata:
break
if keydata:
break
except dns.resolver.NXDOMAIN:
logger.critical('Domain %s does not exist', name)
sys.exit(1)
pass
if not keydata:
logger.critical('Domain %s does not contain a DKIM record', name)
sys.exit(1)
parts = get_parts_from_header(keydata)
if 'p' not in parts:
logger.critical('Domain %s does not contain a DKIM key', name)
sys.exit(1)
return parts['p']
def get_git_toplevel(gitdir: str = None) -> str:
cmdargs = ['git']
if gitdir:
cmdargs += ['--git-dir', gitdir]
cmdargs += ['rev-parse', '--show-toplevel']
ecode, out, err = _run_command(cmdargs)
if ecode == 0:
return out.decode().strip()
return ''
def get_wk_key(domain: str, selector: str, timeout: int = 5) -> str:
wkurl = f'https://{domain}/.well-known/_domainkey/{selector}.txt'
logger.info('Retrieving: %s', wkurl)
res = requests.get(wkurl, timeout=timeout)
if res.status_code != 200:
logger.info('Could not retrieve %s: %s', wkurl, res.status_code)
sys.exit(1)
keydata = res.content.decode().strip()
logger.debug('keydata: %s', keydata)
parts = get_parts_from_header(keydata)
return parts['p']
def get_wkd_key(domain: str, identity: str, selector: str, timeout: int = 5) -> str:
identity = identity.split('@')[0]
# Attempt to load from local git dir if we are in a git dir
gittop = get_git_toplevel()
if gittop:
# urlencode domain/identity/selector to make sure nobody tries path-based badness
subpath = os.path.join(gittop, '.keys', 'devkey',
urllib.parse.quote_plus(domain),
urllib.parse.quote_plus(identity),
urllib.parse.quote_plus(selector) + '.txt',
)
try:
with open(subpath) as fh:
logger.info('Loading: WKD key from %s', subpath)
return fh.readline().strip()
except IOError:
pass
# We use zbase32 because this is what OpenPGP uses
# I'm not convinced this is a sane choice, if only because
# it has a swear word to my account record.
i = hashlib.sha1(identity.lower().encode()).digest()
zdir = anybase32.encode(i, anybase32.ZBASE32).decode()
wkurl = f'https://{domain}/.well-known/devkey/{zdir}/{selector}.txt'
logger.info('Retrieving: %s', wkurl)
res = requests.get(wkurl, timeout=timeout)
if res.status_code != 200:
logger.critical('Could not get %s: %s', wkurl, res.status_code)
sys.exit(1)
# For POC purposes, we are not doing caching or TOFU management,
# but this would be a required part of actual implementation
return res.content.decode().strip()
def get_b64_attestation(msg) -> Tuple[str, str, str]:
# b4 stores these as hexdigests, but it's more natural
# for email headers to use b64 encoded values, if only
# to save a few bytes of space
lmsg = b4.LoreMessage(msg)
lmsg.load_hashes()
att = lmsg.attestation
i = base64.b64encode(binascii.unhexlify(att.i)).decode()
m = base64.b64encode(binascii.unhexlify(att.m)).decode()
p = base64.b64encode(binascii.unhexlify(att.p)).decode()
return i, m, p
def add_hashes_header(msg):
hhdr = gen_hashes_header(msg)
msg[XPH_HDR] = hhdr
return msg
def gen_hashes_header(msg):
i, m, p = get_b64_attestation(msg)
# Hardcode to sha256 for the purposes of the POC
hparts = [
'v=1',
'h=sha256',
f'i={i}',
f'm={m}',
f'p={p}',
]
hval = '; '.join(hparts)
hhdr = email.header.make_header([(hval.encode(), 'us-ascii')], maxlinelen=78)
return hhdr
def get_parts_from_header(hstr: str) -> dict:
hstr = re.sub(r'\s*', '', hstr)
hdata = dict()
for chunk in hstr.split(';'):
parts = chunk.split('=', 1)
if len(parts) < 2:
continue
hdata[parts[0]] = parts[1]
return hdata
def dkim_canonicalize_header(hname: str, hval: str) -> Tuple[str, str]:
hname = hname.lower()
hval = hval.strip()
hval = re.sub(r'\n', '', hval)
hval = re.sub(r'\s+', ' ', hval)
return hname, hval
def verify_attestation_hashes(msg) -> bool:
hhdr = msg.get(XPH_HDR)
hdata = get_parts_from_header(str(hhdr))
adata = dict()
desc = {
'i': 'metadata',
'm': 'commit message',
'p': 'diff content',
}
adata['i'], adata['m'], adata['p'] = get_b64_attestation(msg)
verified = True
logger.info('----- ---------------')
for part, what in desc.items():
if hdata[part] == adata[part]:
status = 'PASS'
else:
verified = False
status = 'FAIL'
logger.info('%s : %s', status, desc[part])
logger.info('----- ---------------')
if verified:
logger.info('PASS : All hashes verified')
else:
logger.info('FAIL : Some or all hashes failed verification')
return verified
def gpg_verify(smsg: bytes, dsig: bytes, keyid: str) -> bool:
# We can't pass both the detached sig and the content on stdin, so
# use a temporary file
savefile = mkstemp('attpoc-pgp-verify')[1]
with open(savefile, 'wb') as fh:
fh.write(dsig)
vrfyargs = ['--verify', '--status-fd=1', savefile, '-']
# Do we have this key in our default keyring?
pubring = None
ecode, out, err = gpg_run_command(['--list-key', keyid])
if ecode > 0:
# See if it's in git itself
gittop = get_git_toplevel()
if gittop:
# Do we have that key in git?
# URLencode keyid to avoid path-based badness
if os.path.exists(
os.path.join(gittop, '.keys', 'openpgp', 'keys', urllib.parse.quote_plus(keyid) + '.asc')):
pubring = os.path.join(gittop, '.keys', 'openpgp', 'pubring.kbx')
if pubring and os.path.exists(pubring):
logger.info('Loading: in-git pubring: %s', pubring)
gpgargs = ['--no-default-keyring', '--keyring', pubring] + vrfyargs
else:
logger.critical('Unable to find key %s', keyid)
sys.exit(1)
else:
gpgargs = vrfyargs
ecode, out, err = gpg_run_command(gpgargs, stdin=smsg)
os.unlink(savefile)
if ecode > 0:
logger.critical('PGP signature failed to verify')
logger.critical(err.decode())
return False
output = out.decode()
# We're looking for GOODSIG and VALIDSIG
# For the purposes of this POC, we're not doing the following, but normally would:
# - check UIDs to make sure that they match From:
# - check signature date for its drift from Date:
gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', output, re.M)
vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
if gs_matches and vs_matches:
signer = gs_matches.groups()[1]
if not pubring:
# If we're not using an in-git pubring, also check TRUST_* output
ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
if not ts_matches:
logger.critical('Insufficient TRUST on: %s', signer)
return False
logger.info('PGP signature verified: %s', signer)
return True
# Theoretically, this would have resulted in ecode > 0
logger.critical('PGP signature failed to verify')
logger.critical(err.decode())
return False
def cmd_hashes_hdr(cmdargs) -> None:
msg = load_message(cmdargs.message)
hhdr = gen_hashes_header(msg)
logger.info('--- HEADER STARTS ---')
sys.stdout.buffer.write(XPH_HDR.encode() + b': ' + hhdr.encode().encode() + b'\r\n')
def cmd_sign_dkim(cmdargs) -> None:
logger.info('Signing: plain DKIM')
msg = load_message(cmdargs.message)
# We use dkimpy here as a demonstration of an external DKIM compliant
# implementation generating the DKIM-Signature tag, complete with bh=
# body hash that we don't consider for our purposes:
# - it will most certainly get mangled by mailing-list software
# - if it's canonicalized with "relaxed", we can no longer consider
# patch content to be trusted, as "relaxed" canonicalization modifies
# whitespace, which allows sneaking in maliciously modified patches
# in languages with syntactic whitespace.
msg = add_hashes_header(msg)
domain = cmdargs.domain.encode()
selector = cmdargs.selector.encode()
identity = cmdargs.identity.encode()
include_headers = [b'from', b'date', b'x-patch-hashes']
with open(cmdargs.privkey, 'rb') as fh:
logger.info('Using %s to sign', cmdargs.privkey)
privkey = fh.read()
dk = dkim.DKIM(msg.as_bytes())
bhdr = dk.sign(selector, domain, privkey, identity=identity, include_headers=include_headers)
dhdr = bhdr.decode()
msg['DKIM-Signature'] = dhdr.split(':', 1)[1]
logger.info('--- MESSAGE STARTS ---')
sys.stdout.buffer.write(msg.as_bytes())
def cmd_verify_dkim(msg) -> None:
logger.info('Verifying: Plain DKIM')
# We don't use dkimpy to verify, as it will force verification of bh=, which
# we intentionally choose not to use for reasons listed above.
# However, our implementation is simplistic and doesn't cover many aspects
# of the DKIM standard, so real-life implementations need to consider using
# feature-complete DKIM verification tools, assuming they allow ignoring the
# bh= field.
dks = msg.get('dkim-signature')
ddata = get_parts_from_header(dks)
# Make sure the x-patch-hashes header is included
included = ddata['h'].split(':')
if XPH_HDR.lower() not in included:
logger.critical('%s is not included in signed headers, unable to verify!', XPH_HDR)
sys.exit(1)
pk = base64.b64decode(get_dkim_key(ddata['d'], ddata['s']))
sig = base64.b64decode(ddata['b'])
headers = list()
for header in ddata['h'].split(':'):
# For the POC, we assume 'relaxed/'
hval = msg.get(header)
if hval is None:
# Missing headers are omitted by the DKIM RFC
continue
hname, hval = dkim_canonicalize_header(header, str(msg.get(header)))
headers.append(f'{hname}:{hval}')
# Now we add the dkim-signature header itself, without b= content
dname, dval = dkim_canonicalize_header('dkim-signature', dks)
dval = dval.rsplit('; b=')[0] + '; b='
headers.append(f'{dname}:{dval}')
payload = ('\r\n'.join(headers)).encode()
key = RSA.import_key(pk)
hashed = SHA256.new(payload)
try:
# noinspection PyTypeChecker
pkcs1_15.new(key).verify(hashed, sig)
except (ValueError, TypeError):
logger.critical('FAIL: The DKIM signature did NOT verify!')
sys.exit(1)
if not verify_identity_domain(msg, ddata['i'], ddata['d']):
sys.exit(1)
logger.info('PASS : DKIM signature for d=%s, s=%s', ddata['d'], ddata['s'])
verify_attestation_hashes(msg)
def cmd_sign_pgp(cmdargs) -> None:
logger.info('Signing: PGP')
msg = load_message(cmdargs.message)
# selector is the key id of the [C] public key, e.g. 0xE63EDCA9329DD07E.
# identity is the UID we should look for on the key
# The signature can be made with a subkey, so we embed the key ID into
# the header for lookup convenience.
# We don't embed signing time, as it's part of the PGP signature.
headers = list()
hhdr = gen_hashes_header(msg)
msg[XPH_HDR] = hhdr
hhname, hhval = dkim_canonicalize_header(XPH_HDR, hhdr.encode())
headers.append(f'{hhname}:{hhval}')
hparts = [
'm=pgp',
f'i={cmdargs.identity}',
f's=0x{cmdargs.keyid}',
'b=',
]
shname, shval = dkim_canonicalize_header(XPS_HDR, '; '.join(hparts))
headers.append(f'{shname}:{shval}')
payload = '\r\n'.join(headers).encode()
if cmdargs.subkeyid:
gpgargs = ['-b', '-u', f'{cmdargs.subkeyid}!']
else:
gpgargs = ['-b', '-u', cmdargs.keyid]
ecode, out, err = gpg_run_command(gpgargs, payload)
if ecode > 0:
logger.critical('Running gpg failed')
logger.critical(err.decode())
sys.exit(ecode)
bdata = base64.b64encode(out)
shval += splitter(bdata.decode())
shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
msg[XPS_HDR] = shdr
logger.info('--- MESSAGE STARTS ---')
sys.stdout.buffer.write(msg.as_bytes())
def cmd_pgp_verify(msg):
shdr = msg.get(XPS_HDR)
sdata = get_parts_from_header(shdr)
sig = base64.b64decode(sdata['b'])
headers = list()
hhname, hhval = dkim_canonicalize_header(XPH_HDR, str(msg.get(XPH_HDR)))
headers.append(f'{hhname}:{hhval}')
# Now we add the sig header itself, without b= content
shname, shval = dkim_canonicalize_header(XPS_HDR, shdr)
shval = shval.rsplit('; b=')[0] + '; b='
headers.append(f'{shname}:{shval}')
payload = ('\r\n'.join(headers)).encode()
keyid = re.sub(r'^0x', '', sdata['s'])
if gpg_verify(payload, sig, keyid):
verify_attestation_hashes(msg)
def cmd_sign_dk(cmdargs, mode='dk'):
logger.info('Signing: plain DKIM')
msg = load_message(cmdargs.message)
# selector is the same as in DKIM, the leftmost part of foo._domainkey.example.org
# identity should match domain in from
headers = list()
hhdr = gen_hashes_header(msg)
msg[XPH_HDR] = hhdr
hhname, hhval = dkim_canonicalize_header(XPH_HDR, hhdr.encode())
headers.append(f'{hhname}:{hhval}')
signtime = str(int(time.time()))
hparts = [
f'm={mode}',
f'd={cmdargs.domain}',
f'i={cmdargs.identity}',
f's={cmdargs.selector}',
f't={signtime}',
'a=ed25519-sha256',
'b=',
]
shname, shval = dkim_canonicalize_header(XPS_HDR, '; '.join(hparts))
headers.append(f'{shname}:{shval}')
payload = '\r\n'.join(headers).encode()
hashed = hashlib.sha256()
hashed.update(payload)
try:
with open(cmdargs.privkey, 'r') as fh:
sk = SigningKey(fh.read(), encoder=Base64Encoder)
except IOError:
logger.critical('Could not open %s', cmdargs.privkey)
sys.exit(1)
bdata = sk.sign(hashed.digest(), encoder=Base64Encoder)
shval += splitter(bdata.decode())
shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
msg[XPS_HDR] = shdr
logger.info('--- MESSAGE STARTS ---')
sys.stdout.buffer.write(msg.as_bytes())
def cmd_verify_dk(msg, mode='dk'):
logger.info('Verifying: %s (mode=%s)', XPS_HDR, mode)
shdr = msg.get(XPS_HDR)
sdata = get_parts_from_header(shdr)
headers = list()
hhname, hhval = dkim_canonicalize_header(XPH_HDR, str(msg.get(XPH_HDR)))
headers.append(f'{hhname}:{hhval}')
# Now we add the sig header itself, without b= content
shname, shval = dkim_canonicalize_header(XPS_HDR, shdr)
shval = shval.rsplit('; b=')[0] + '; b='
headers.append(f'{shname}:{shval}')
payload = ('\r\n'.join(headers)).encode()
hashed = hashlib.sha256()
hashed.update(payload)
if mode == 'dk':
pk = get_dkim_key(sdata['d'], sdata['s'], wantkey='ed25519')
elif mode == 'wk':
pk = get_wk_key(sdata['d'], sdata['s'])
elif mode == 'wkd':
pk = get_wkd_key(sdata['d'], sdata['i'], sdata['s'])
else:
logger.critical('Unknown mode: %s', mode)
sys.exit(1)
vk = VerifyKey(pk, encoder=Base64Encoder)
try:
foo = vk.verify(sdata['b'].encode(), encoder=Base64Encoder)
except BadSignatureError:
logger.critical('FAIL : mode=%s signature verification for: d=%s, i=%s, s=%s', mode,
sdata['d'], sdata['i'], sdata['s'])
sys.exit(1)
if foo != hashed.digest():
logger.critical('FAIL : mode=%s signature verification for: d=%s, i=%s, s=%s', mode,
sdata['d'], sdata['i'], sdata['s'])
sys.exit(1)
if not verify_identity_domain(msg, sdata['i'], sdata['d']):
sys.exit(1)
logger.info('PASS : mode=%s signature verified for: d=%s, i=%s, s=%s', mode,
sdata['d'], sdata['i'], sdata['s'])
verify_attestation_hashes(msg)
def cmd_sign_wk(cmdargs):
cmd_sign_dk(cmdargs, mode='wk')
def cmd_verify_wk(msg):
cmd_verify_dk(msg, mode='wk')
def cmd_sign_wkd(cmdargs):
cmd_sign_dk(cmdargs, mode='wkd')
def cmd_verify_wkd(msg):
cmd_verify_dk(msg, mode='wkd')
def cmd_verify(cmdargs):
msg = load_message(cmdargs.message)
# do we have a hashes header?
if not msg.get(XPH_HDR):
logger.critical('Message does not contain %s, nothing to verify', XPH_HDR)
sys.exit(1)
# do we have a sig header?
shdr = msg.get(XPS_HDR)
if not shdr:
if not msg.get('dkim-signature'):
logger.critical('Message contains unsigned hashes, cannot verify')
sys.exit(1)
# Run as a plain dkim verification
cmd_verify_dkim(msg)
sys.exit(0)
sdata = get_parts_from_header(shdr)
if sdata['m'] == 'pgp':
cmd_pgp_verify(msg)
sys.exit(0)
if sdata['m'] == 'dk':
cmd_verify_dk(msg)
sys.exit(0)
if sdata['m'] == 'wk':
cmd_verify_wk(msg)
sys.exit(0)
if sdata['m'] == 'wkd':
cmd_verify_wkd(msg)
sys.exit(0)
logger.critical('Unknown mode: %s', sdata['m'])
sys.exit(1)
if __name__ == '__main__':
import argparse
# noinspection PyTypeChecker
parser = argparse.ArgumentParser(
prog='main',
description='A proof of concept tool for header-based email patch attestation',
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('-m', '--message', default='emails/unsigned.eml',
help='File with the message to work with')
parser.add_argument('-v', '--verbose', default=False,
help='Print extra debugging output')
subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd')
# hashes-hdr
sp_hhdr = subparsers.add_parser('hashes-hdr', help='Create a hashes header')
sp_hhdr.set_defaults(func=cmd_hashes_hdr)
# sign-dkim
sp_sdkim = subparsers.add_parser('sign-dkim', help='Create a DKIM signature')
sp_sdkim.add_argument('-i', '--identity', default='@example.org')
sp_sdkim.add_argument('-d', '--domain', default='example.org')
sp_sdkim.add_argument('-s', '--selector', default='patches')
sp_sdkim.add_argument('-k', '--privkey', default='rsa.key')
sp_sdkim.set_defaults(func=cmd_sign_dkim)
# sign-dk
sp_sdk = subparsers.add_parser('sign-dk', help='Create a mode=dk signature')
sp_sdk.add_argument('-i', '--identity', default='@example.org')
sp_sdk.add_argument('-d', '--domain', default='example.org')
sp_sdk.add_argument('-s', '--selector', default='patches')
sp_sdk.add_argument('-k', '--privkey', default='dk.key')
sp_sdk.set_defaults(func=cmd_sign_dk)
# sign-wk
sp_swk = subparsers.add_parser('sign-wk', help='Create a mode=wk signature')
sp_swk.add_argument('-i', '--identity', default='@example.org')
sp_swk.add_argument('-d', '--domain', default='example.org')
sp_swk.add_argument('-s', '--selector', default='patches')
sp_swk.add_argument('-k', '--privkey', default='dk.key')
sp_swk.set_defaults(func=cmd_sign_wk)
# sign-wk
sp_swkd = subparsers.add_parser('sign-wkd', help='Create a mode=wkd signature')
sp_swkd.add_argument('-i', '--identity', default='dev@kernel.org')
sp_swkd.add_argument('-d', '--domain', default='example.org')
sp_swkd.add_argument('-s', '--selector', default='patches')
sp_swkd.add_argument('-k', '--privkey', default='ingit.key')
sp_swkd.set_defaults(func=cmd_sign_wkd)
# sign-pgp
sp_spgp = subparsers.add_parser('sign-pgp', help='Create a PGP signature')
sp_spgp.add_argument('-i', '--identity', default='dev@kernel.org')
sp_spgp.add_argument('-k', '--keyid', default='AAAABBBBCCCCDDDD')
sp_spgp.add_argument('-u', '--subkeyid')
sp_spgp.set_defaults(func=cmd_sign_pgp)
# verify
sp_verify = subparsers.add_parser('verify', help='Verify a signed message')
sp_verify.set_defaults(func=cmd_verify)
args = parser.parse_args()
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
ch.setFormatter(formatter)
if args.verbose:
ch.setLevel(logging.DEBUG)
else:
ch.setLevel(logging.INFO)
logger.addHandler(ch)
if 'func' not in args:
parser.print_help()
sys.exit(1)
args.func(args)