| #!/usr/bin/env python3 |
| # Copyright (C) 2020 by the Linux Foundation |
| # SPDX-License-Identifier: MIT-0 |
| import sys |
| import os |
| import base64 |
| import email |
| import email.header |
| import email.utils |
| import b4 |
| import time |
| import dkim |
| import re |
| import dns.resolver |
| import binascii |
| import subprocess |
| import hashlib |
| import requests |
| import anybase32 |
| import urllib.parse |
| import logging |
| import datetime |
| |
| from nacl.signing import SigningKey, VerifyKey |
| from nacl.encoding import Base64Encoder |
| from nacl.exceptions import BadSignatureError |
| |
| from tempfile import mkstemp |
| |
| from Cryptodome.Signature import pkcs1_15 |
| from Cryptodome.Hash import SHA256 |
| from Cryptodome.PublicKey import RSA |
| |
| from typing import Tuple |
| |
| XPH_HDR = 'X-Patch-Hashes' |
| XPS_HDR = 'X-Patch-Sig' |
| |
| # For POC purposes, we hardcode maximum drift to 30 days |
| MAXDRIFT = datetime.timedelta(days=30) |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def _run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]: |
| sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) |
| (output, error) = sp.communicate(input=stdin) |
| return sp.returncode, output, error |
| |
| |
| def gpg_run_command(cmdargs: list, stdin: bytes = None) -> Tuple[int, bytes, bytes]: |
| gpgbin = 'gpg' |
| cmdargs = [gpgbin, '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] + cmdargs |
| return _run_command(cmdargs, stdin) |
| |
| |
| def load_message(msgfile: str): |
| with open(msgfile, 'rb') as fh: |
| logger.info('Using %s as message source', msgfile) |
| contents = fh.read() |
| return email.message_from_bytes(contents) |
| |
| |
| def splitter(longstr: str, limit: int = 77) -> str: |
| splitstr = list() |
| first = True |
| while len(longstr) > limit: |
| at = limit |
| if first: |
| first = False |
| at -= 2 |
| splitstr.append(longstr[:at]) |
| longstr = longstr[at:] |
| splitstr.append(longstr) |
| return ' '.join(splitstr) |
| |
| |
| def verify_identity_domain(msg, identity: str, domain: str) -> bool: |
| # Domain is supposed to be present in identity |
| if not identity.endswith(domain): |
| logger.critical('FAIL : domain (d=%s) is not in identity (i=%s)', domain, identity) |
| return False |
| fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1] |
| if identity.find('@') < 0: |
| logger.critical('FAIL : identity must contain @ (i=%s)', identity) |
| return False |
| ilocal, idomain = identity.split('@') |
| # identity is supposed to be present in from |
| if not fromeml.endswith(f'@{idomain}'): |
| logger.critical('FAIL : identity (i=%s) does not match from (from=%s)', identity, fromeml) |
| return False |
| logger.info('PASS : identity and domain match From header') |
| return True |
| |
| |
| def verify_time_drift(msg, timestamp: str) -> bool: |
| msgdt = email.utils.parsedate_to_datetime(str(msg['Date'])) |
| sigdt = datetime.datetime.utcfromtimestamp(int(timestamp)).replace(tzinfo=datetime.timezone.utc) |
| sdrift = sigdt - msgdt |
| if sdrift > MAXDRIFT: |
| logger.critical('FAIL : time drift between Date and t too great (%s)', sdrift) |
| return False |
| logger.info('PASS : time drift between Date and t (%s)', sdrift) |
| return True |
| |
| |
| def get_dkim_key(domain: str, selector: str, wantkey: str = 'rsa', timeout: int = 5) -> str: |
| name = f'{selector}._domainkey.{domain}.' |
| logger.info('DNS-lookup: %s', name) |
| keydata = None |
| try: |
| a = dns.resolver.resolve(name, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout) |
| # Find v=DKIM1 |
| for r in a.response.answer: |
| for item in r.items: |
| for s in item.strings: |
| if s.find(b'v=DKIM1') >= 0: |
| keydata = s.decode() |
| if keydata.find(wantkey) >= 0: |
| break |
| keydata = None |
| if keydata: |
| break |
| if keydata: |
| break |
| except dns.resolver.NXDOMAIN: |
| logger.critical('Domain %s does not exist', name) |
| sys.exit(1) |
| pass |
| |
| if not keydata: |
| logger.critical('Domain %s does not contain a DKIM record', name) |
| sys.exit(1) |
| |
| parts = get_parts_from_header(keydata) |
| if 'p' not in parts: |
| logger.critical('Domain %s does not contain a DKIM key', name) |
| sys.exit(1) |
| |
| return parts['p'] |
| |
| |
| def get_git_toplevel(gitdir: str = None) -> str: |
| cmdargs = ['git'] |
| if gitdir: |
| cmdargs += ['--git-dir', gitdir] |
| cmdargs += ['rev-parse', '--show-toplevel'] |
| ecode, out, err = _run_command(cmdargs) |
| if ecode == 0: |
| return out.decode().strip() |
| return '' |
| |
| |
| def get_wk_key(domain: str, selector: str, timeout: int = 5) -> str: |
| wkurl = f'https://{domain}/.well-known/_domainkey/{selector}.txt' |
| logger.info('Retrieving: %s', wkurl) |
| res = requests.get(wkurl, timeout=timeout) |
| if res.status_code != 200: |
| logger.info('Could not retrieve %s: %s', wkurl, res.status_code) |
| sys.exit(1) |
| keydata = res.content.decode().strip() |
| logger.debug('keydata: %s', keydata) |
| parts = get_parts_from_header(keydata) |
| return parts['p'] |
| |
| |
| def get_wkd_key(domain: str, identity: str, selector: str, timeout: int = 5) -> str: |
| identity = identity.split('@')[0] |
| # Attempt to load from local git dir if we are in a git dir |
| gittop = get_git_toplevel() |
| if gittop: |
| # urlencode domain/identity/selector to make sure nobody tries path-based badness |
| subpath = os.path.join(gittop, '.keys', 'devkey', |
| urllib.parse.quote_plus(domain), |
| urllib.parse.quote_plus(identity), |
| urllib.parse.quote_plus(selector) + '.txt', |
| ) |
| try: |
| with open(subpath) as fh: |
| logger.info('Loading: WKD key from %s', subpath) |
| return fh.readline().strip() |
| except IOError: |
| pass |
| |
| # We use zbase32 because this is what OpenPGP uses |
| # I'm not convinced this is a sane choice, if only because |
| # it has a swear word to my account record. |
| i = hashlib.sha1(identity.lower().encode()).digest() |
| zdir = anybase32.encode(i, anybase32.ZBASE32).decode() |
| wkurl = f'https://{domain}/.well-known/devkey/{zdir}/{selector}.txt' |
| logger.info('Retrieving: %s', wkurl) |
| res = requests.get(wkurl, timeout=timeout) |
| if res.status_code != 200: |
| logger.critical('Could not get %s: %s', wkurl, res.status_code) |
| sys.exit(1) |
| # For POC purposes, we are not doing caching or TOFU management, |
| # but this would be a required part of actual implementation |
| return res.content.decode().strip() |
| |
| |
| def get_b64_attestation(msg) -> Tuple[str, str, str]: |
| # b4 stores these as hexdigests, but it's more natural |
| # for email headers to use b64 encoded values, if only |
| # to save a few bytes of space |
| lmsg = b4.LoreMessage(msg) |
| lmsg.load_hashes() |
| att = lmsg.attestation |
| i = base64.b64encode(binascii.unhexlify(att.i)).decode() |
| m = base64.b64encode(binascii.unhexlify(att.m)).decode() |
| p = base64.b64encode(binascii.unhexlify(att.p)).decode() |
| return i, m, p |
| |
| |
| def add_hashes_header(msg): |
| hhdr = gen_hashes_header(msg) |
| msg[XPH_HDR] = hhdr |
| return msg |
| |
| |
| def gen_hashes_header(msg): |
| i, m, p = get_b64_attestation(msg) |
| # Hardcode to sha256 for the purposes of the POC |
| hparts = [ |
| 'v=1', |
| 'h=sha256', |
| f'i={i}', |
| f'm={m}', |
| f'p={p}', |
| ] |
| hval = '; '.join(hparts) |
| hhdr = email.header.make_header([(hval.encode(), 'us-ascii')], maxlinelen=78) |
| return hhdr |
| |
| |
| def get_parts_from_header(hstr: str) -> dict: |
| hstr = re.sub(r'\s*', '', hstr) |
| hdata = dict() |
| for chunk in hstr.split(';'): |
| parts = chunk.split('=', 1) |
| if len(parts) < 2: |
| continue |
| hdata[parts[0]] = parts[1] |
| return hdata |
| |
| |
| def dkim_canonicalize_header(hname: str, hval: str) -> Tuple[str, str]: |
| hname = hname.lower() |
| hval = hval.strip() |
| hval = re.sub(r'\n', '', hval) |
| hval = re.sub(r'\s+', ' ', hval) |
| return hname, hval |
| |
| |
| def verify_attestation_hashes(msg) -> bool: |
| hhdr = msg.get(XPH_HDR) |
| hdata = get_parts_from_header(str(hhdr)) |
| adata = dict() |
| desc = { |
| 'i': 'metadata', |
| 'm': 'commit message', |
| 'p': 'diff content', |
| } |
| adata['i'], adata['m'], adata['p'] = get_b64_attestation(msg) |
| verified = True |
| logger.info('----- ---------------') |
| for part, what in desc.items(): |
| if hdata[part] == adata[part]: |
| status = 'PASS' |
| else: |
| verified = False |
| status = 'FAIL' |
| logger.info('%s : %s', status, desc[part]) |
| logger.info('----- ---------------') |
| if verified: |
| logger.info('PASS : All hashes verified') |
| else: |
| logger.info('FAIL : Some or all hashes failed verification') |
| |
| return verified |
| |
| |
| def gpg_verify(smsg: bytes, dsig: bytes, keyid: str) -> bool: |
| # We can't pass both the detached sig and the content on stdin, so |
| # use a temporary file |
| savefile = mkstemp('attpoc-pgp-verify')[1] |
| with open(savefile, 'wb') as fh: |
| fh.write(dsig) |
| vrfyargs = ['--verify', '--status-fd=1', savefile, '-'] |
| # Do we have this key in our default keyring? |
| pubring = None |
| ecode, out, err = gpg_run_command(['--list-key', keyid]) |
| if ecode > 0: |
| # See if it's in git itself |
| gittop = get_git_toplevel() |
| if gittop: |
| # Do we have that key in git? |
| # URLencode keyid to avoid path-based badness |
| if os.path.exists( |
| os.path.join(gittop, '.keys', 'openpgp', 'keys', urllib.parse.quote_plus(keyid) + '.asc')): |
| pubring = os.path.join(gittop, '.keys', 'openpgp', 'pubring.kbx') |
| if pubring and os.path.exists(pubring): |
| logger.info('Loading: in-git pubring: %s', pubring) |
| gpgargs = ['--no-default-keyring', '--keyring', pubring] + vrfyargs |
| else: |
| logger.critical('Unable to find key %s', keyid) |
| sys.exit(1) |
| else: |
| gpgargs = vrfyargs |
| ecode, out, err = gpg_run_command(gpgargs, stdin=smsg) |
| os.unlink(savefile) |
| if ecode > 0: |
| logger.critical('PGP signature failed to verify') |
| logger.critical(err.decode()) |
| return False |
| output = out.decode() |
| # We're looking for GOODSIG and VALIDSIG |
| # For the purposes of this POC, we're not doing the following, but normally would: |
| # - check UIDs to make sure that they match From: |
| # - check signature date for its drift from Date: |
| gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', output, re.M) |
| vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) |
| if gs_matches and vs_matches: |
| signer = gs_matches.groups()[1] |
| if not pubring: |
| # If we're not using an in-git pubring, also check TRUST_* output |
| ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M) |
| if not ts_matches: |
| logger.critical('Insufficient TRUST on: %s', signer) |
| return False |
| |
| logger.info('PGP signature verified: %s', signer) |
| return True |
| |
| # Theoretically, this would have resulted in ecode > 0 |
| logger.critical('PGP signature failed to verify') |
| logger.critical(err.decode()) |
| return False |
| |
| |
| def cmd_hashes_hdr(cmdargs) -> None: |
| msg = load_message(cmdargs.message) |
| hhdr = gen_hashes_header(msg) |
| logger.info('--- HEADER STARTS ---') |
| sys.stdout.buffer.write(XPH_HDR.encode() + b': ' + hhdr.encode().encode() + b'\r\n') |
| |
| |
| def cmd_sign_dkim(cmdargs) -> None: |
| logger.info('Signing: plain DKIM') |
| msg = load_message(cmdargs.message) |
| # We use dkimpy here as a demonstration of an external DKIM compliant |
| # implementation generating the DKIM-Signature tag, complete with bh= |
| # body hash that we don't consider for our purposes: |
| # - it will most certainly get mangled by mailing-list software |
| # - if it's canonicalized with "relaxed", we can no longer consider |
| # patch content to be trusted, as "relaxed" canonicalization modifies |
| # whitespace, which allows sneaking in maliciously modified patches |
| # in languages with syntactic whitespace. |
| msg = add_hashes_header(msg) |
| domain = cmdargs.domain.encode() |
| selector = cmdargs.selector.encode() |
| identity = cmdargs.identity.encode() |
| include_headers = [b'from', b'date', b'x-patch-hashes'] |
| |
| with open(cmdargs.privkey, 'rb') as fh: |
| logger.info('Using %s to sign', cmdargs.privkey) |
| privkey = fh.read() |
| |
| dk = dkim.DKIM(msg.as_bytes()) |
| bhdr = dk.sign(selector, domain, privkey, identity=identity, include_headers=include_headers) |
| dhdr = bhdr.decode() |
| msg['DKIM-Signature'] = dhdr.split(':', 1)[1] |
| logger.info('--- MESSAGE STARTS ---') |
| sys.stdout.buffer.write(msg.as_bytes()) |
| |
| |
| def cmd_verify_dkim(msg) -> None: |
| logger.info('Verifying: Plain DKIM') |
| # We don't use dkimpy to verify, as it will force verification of bh=, which |
| # we intentionally choose not to use for reasons listed above. |
| # However, our implementation is simplistic and doesn't cover many aspects |
| # of the DKIM standard, so real-life implementations need to consider using |
| # feature-complete DKIM verification tools, assuming they allow ignoring the |
| # bh= field. |
| dks = msg.get('dkim-signature') |
| ddata = get_parts_from_header(dks) |
| # Make sure the x-patch-hashes header is included |
| included = ddata['h'].split(':') |
| if XPH_HDR.lower() not in included: |
| logger.critical('%s is not included in signed headers, unable to verify!', XPH_HDR) |
| sys.exit(1) |
| pk = base64.b64decode(get_dkim_key(ddata['d'], ddata['s'])) |
| sig = base64.b64decode(ddata['b']) |
| headers = list() |
| |
| for header in ddata['h'].split(':'): |
| # For the POC, we assume 'relaxed/' |
| hval = msg.get(header) |
| if hval is None: |
| # Missing headers are omitted by the DKIM RFC |
| continue |
| hname, hval = dkim_canonicalize_header(header, str(msg.get(header))) |
| headers.append(f'{hname}:{hval}') |
| # Now we add the dkim-signature header itself, without b= content |
| dname, dval = dkim_canonicalize_header('dkim-signature', dks) |
| dval = dval.rsplit('; b=')[0] + '; b=' |
| headers.append(f'{dname}:{dval}') |
| payload = ('\r\n'.join(headers)).encode() |
| key = RSA.import_key(pk) |
| hashed = SHA256.new(payload) |
| try: |
| # noinspection PyTypeChecker |
| pkcs1_15.new(key).verify(hashed, sig) |
| except (ValueError, TypeError): |
| logger.critical('FAIL: The DKIM signature did NOT verify!') |
| sys.exit(1) |
| if not verify_identity_domain(msg, ddata['i'], ddata['d']): |
| sys.exit(1) |
| if not verify_time_drift(msg, ddata['t']): |
| sys.exit(1) |
| logger.info('PASS : DKIM signature for d=%s, s=%s', ddata['d'], ddata['s']) |
| verify_attestation_hashes(msg) |
| |
| |
| def cmd_sign_pgp(cmdargs) -> None: |
| logger.info('Signing: PGP') |
| msg = load_message(cmdargs.message) |
| # selector is the key id of the [C] public key, e.g. 0xE63EDCA9329DD07E. |
| # identity is the UID we should look for on the key |
| # The signature can be made with a subkey, so we embed the key ID into |
| # the header for lookup convenience. |
| # We don't embed signing time, as it's part of the PGP signature. |
| headers = list() |
| hhdr = gen_hashes_header(msg) |
| msg[XPH_HDR] = hhdr |
| hhname, hhval = dkim_canonicalize_header(XPH_HDR, hhdr.encode()) |
| headers.append(f'{hhname}:{hhval}') |
| |
| hparts = [ |
| 'm=pgp', |
| f'i={cmdargs.identity}', |
| f's=0x{cmdargs.keyid}', |
| 'b=', |
| ] |
| shname, shval = dkim_canonicalize_header(XPS_HDR, '; '.join(hparts)) |
| headers.append(f'{shname}:{shval}') |
| payload = '\r\n'.join(headers).encode() |
| if cmdargs.subkeyid: |
| gpgargs = ['-b', '-u', f'{cmdargs.subkeyid}!'] |
| else: |
| gpgargs = ['-b', '-u', cmdargs.keyid] |
| ecode, out, err = gpg_run_command(gpgargs, payload) |
| if ecode > 0: |
| logger.critical('Running gpg failed') |
| logger.critical(err.decode()) |
| sys.exit(ecode) |
| bdata = base64.b64encode(out) |
| shval += splitter(bdata.decode()) |
| shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78) |
| msg[XPS_HDR] = shdr |
| logger.info('--- MESSAGE STARTS ---') |
| sys.stdout.buffer.write(msg.as_bytes()) |
| |
| |
| def cmd_pgp_verify(msg): |
| shdr = msg.get(XPS_HDR) |
| sdata = get_parts_from_header(shdr) |
| sig = base64.b64decode(sdata['b']) |
| headers = list() |
| hhname, hhval = dkim_canonicalize_header(XPH_HDR, str(msg.get(XPH_HDR))) |
| headers.append(f'{hhname}:{hhval}') |
| # Now we add the sig header itself, without b= content |
| shname, shval = dkim_canonicalize_header(XPS_HDR, shdr) |
| shval = shval.rsplit('; b=')[0] + '; b=' |
| headers.append(f'{shname}:{shval}') |
| payload = ('\r\n'.join(headers)).encode() |
| keyid = re.sub(r'^0x', '', sdata['s']) |
| if gpg_verify(payload, sig, keyid): |
| verify_attestation_hashes(msg) |
| |
| |
| def cmd_sign_dk(cmdargs, mode='dk'): |
| logger.info('Signing: %s header using %s mode', XPS_HDR, mode) |
| msg = load_message(cmdargs.message) |
| # selector is the same as in DKIM, the leftmost part of foo._domainkey.example.org |
| # identity should match domain in from |
| headers = list() |
| hhdr = gen_hashes_header(msg) |
| msg[XPH_HDR] = hhdr |
| hhname, hhval = dkim_canonicalize_header(XPH_HDR, hhdr.encode()) |
| headers.append(f'{hhname}:{hhval}') |
| signtime = str(int(time.time())) |
| |
| hparts = [ |
| f'm={mode}', |
| f'd={cmdargs.domain}', |
| f'i={cmdargs.identity}', |
| f's={cmdargs.selector}', |
| f't={signtime}', |
| 'a=ed25519-sha256', |
| 'b=', |
| ] |
| shname, shval = dkim_canonicalize_header(XPS_HDR, '; '.join(hparts)) |
| headers.append(f'{shname}:{shval}') |
| payload = '\r\n'.join(headers).encode() |
| hashed = hashlib.sha256() |
| hashed.update(payload) |
| try: |
| with open(cmdargs.privkey, 'r') as fh: |
| sk = SigningKey(fh.read(), encoder=Base64Encoder) |
| except IOError: |
| logger.critical('Could not open %s', cmdargs.privkey) |
| sys.exit(1) |
| |
| bdata = sk.sign(hashed.digest(), encoder=Base64Encoder) |
| shval += splitter(bdata.decode()) |
| shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78) |
| msg[XPS_HDR] = shdr |
| logger.info('--- MESSAGE STARTS ---') |
| sys.stdout.buffer.write(msg.as_bytes()) |
| |
| |
| def cmd_verify_dk(msg, mode='dk'): |
| logger.info('Verifying: %s (mode=%s)', XPS_HDR, mode) |
| shdr = msg.get(XPS_HDR) |
| sdata = get_parts_from_header(shdr) |
| headers = list() |
| hhname, hhval = dkim_canonicalize_header(XPH_HDR, str(msg.get(XPH_HDR))) |
| headers.append(f'{hhname}:{hhval}') |
| # Now we add the sig header itself, without b= content |
| shname, shval = dkim_canonicalize_header(XPS_HDR, shdr) |
| shval = shval.rsplit('; b=')[0] + '; b=' |
| headers.append(f'{shname}:{shval}') |
| payload = ('\r\n'.join(headers)).encode() |
| hashed = hashlib.sha256() |
| hashed.update(payload) |
| if mode == 'dk': |
| pk = get_dkim_key(sdata['d'], sdata['s'], wantkey='ed25519') |
| elif mode == 'wk': |
| pk = get_wk_key(sdata['d'], sdata['s']) |
| elif mode == 'wkd': |
| pk = get_wkd_key(sdata['d'], sdata['i'], sdata['s']) |
| else: |
| logger.critical('Unknown mode: %s', mode) |
| sys.exit(1) |
| |
| vk = VerifyKey(pk, encoder=Base64Encoder) |
| try: |
| foo = vk.verify(sdata['b'].encode(), encoder=Base64Encoder) |
| except BadSignatureError: |
| logger.critical('FAIL : mode=%s signature verification for: d=%s, i=%s, s=%s', mode, |
| sdata['d'], sdata['i'], sdata['s']) |
| sys.exit(1) |
| if foo != hashed.digest(): |
| logger.critical('FAIL : mode=%s signature verification for: d=%s, i=%s, s=%s', mode, |
| sdata['d'], sdata['i'], sdata['s']) |
| sys.exit(1) |
| |
| if not verify_identity_domain(msg, sdata['i'], sdata['d']): |
| sys.exit(1) |
| if not verify_time_drift(msg, sdata['t']): |
| sys.exit(1) |
| logger.info('PASS : mode=%s signature verified for: d=%s, i=%s, s=%s', mode, |
| sdata['d'], sdata['i'], sdata['s']) |
| verify_attestation_hashes(msg) |
| |
| |
| def cmd_sign_wk(cmdargs): |
| cmd_sign_dk(cmdargs, mode='wk') |
| |
| |
| def cmd_verify_wk(msg): |
| cmd_verify_dk(msg, mode='wk') |
| |
| |
| def cmd_sign_wkd(cmdargs): |
| cmd_sign_dk(cmdargs, mode='wkd') |
| |
| |
| def cmd_verify_wkd(msg): |
| cmd_verify_dk(msg, mode='wkd') |
| |
| |
| def cmd_verify(cmdargs): |
| msg = load_message(cmdargs.message) |
| # do we have a hashes header? |
| if not msg.get(XPH_HDR): |
| logger.critical('Message does not contain %s, nothing to verify', XPH_HDR) |
| sys.exit(1) |
| # do we have a sig header? |
| shdr = msg.get(XPS_HDR) |
| if not shdr: |
| if not msg.get('dkim-signature'): |
| logger.critical('Message contains unsigned hashes, cannot verify') |
| sys.exit(1) |
| # Run as a plain dkim verification |
| cmd_verify_dkim(msg) |
| sys.exit(0) |
| sdata = get_parts_from_header(shdr) |
| if sdata['m'] == 'pgp': |
| cmd_pgp_verify(msg) |
| sys.exit(0) |
| if sdata['m'] == 'dk': |
| cmd_verify_dk(msg) |
| sys.exit(0) |
| if sdata['m'] == 'wk': |
| cmd_verify_wk(msg) |
| sys.exit(0) |
| if sdata['m'] == 'wkd': |
| cmd_verify_wkd(msg) |
| sys.exit(0) |
| |
| logger.critical('Unknown mode: %s', sdata['m']) |
| sys.exit(1) |
| |
| |
| if __name__ == '__main__': |
| import argparse |
| |
| # noinspection PyTypeChecker |
| parser = argparse.ArgumentParser( |
| prog='main', |
| description='A proof of concept tool for header-based email patch attestation', |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| ) |
| parser.add_argument('-m', '--message', default='emails/unsigned.eml', |
| help='File with the message to work with') |
| parser.add_argument('-v', '--verbose', default=False, |
| help='Print extra debugging output') |
| subparsers = parser.add_subparsers(help='sub-command help', dest='subcmd') |
| |
| # hashes-hdr |
| sp_hhdr = subparsers.add_parser('hashes-hdr', help='Create a hashes header') |
| sp_hhdr.set_defaults(func=cmd_hashes_hdr) |
| |
| # sign-dkim |
| sp_sdkim = subparsers.add_parser('sign-dkim', help='Create a DKIM signature') |
| sp_sdkim.add_argument('-i', '--identity', default='@example.org') |
| sp_sdkim.add_argument('-d', '--domain', default='example.org') |
| sp_sdkim.add_argument('-s', '--selector', default='patches') |
| sp_sdkim.add_argument('-k', '--privkey', default='rsa.key') |
| sp_sdkim.set_defaults(func=cmd_sign_dkim) |
| |
| # sign-dk |
| sp_sdk = subparsers.add_parser('sign-dk', help='Create a mode=dk signature') |
| sp_sdk.add_argument('-i', '--identity', default='@example.org') |
| sp_sdk.add_argument('-d', '--domain', default='example.org') |
| sp_sdk.add_argument('-s', '--selector', default='patches') |
| sp_sdk.add_argument('-k', '--privkey', default='dk.key') |
| sp_sdk.set_defaults(func=cmd_sign_dk) |
| |
| # sign-wk |
| sp_swk = subparsers.add_parser('sign-wk', help='Create a mode=wk signature') |
| sp_swk.add_argument('-i', '--identity', default='@example.org') |
| sp_swk.add_argument('-d', '--domain', default='example.org') |
| sp_swk.add_argument('-s', '--selector', default='patches') |
| sp_swk.add_argument('-k', '--privkey', default='dk.key') |
| sp_swk.set_defaults(func=cmd_sign_wk) |
| |
| # sign-wk |
| sp_swkd = subparsers.add_parser('sign-wkd', help='Create a mode=wkd signature') |
| sp_swkd.add_argument('-i', '--identity', default='dev@kernel.org') |
| sp_swkd.add_argument('-d', '--domain', default='example.org') |
| sp_swkd.add_argument('-s', '--selector', default='patches') |
| sp_swkd.add_argument('-k', '--privkey', default='ingit.key') |
| sp_swkd.set_defaults(func=cmd_sign_wkd) |
| |
| # sign-pgp |
| sp_spgp = subparsers.add_parser('sign-pgp', help='Create a PGP signature') |
| sp_spgp.add_argument('-i', '--identity', default='dev@kernel.org') |
| sp_spgp.add_argument('-k', '--keyid', default='AAAABBBBCCCCDDDD') |
| sp_spgp.add_argument('-u', '--subkeyid') |
| sp_spgp.set_defaults(func=cmd_sign_pgp) |
| |
| # verify |
| sp_verify = subparsers.add_parser('verify', help='Verify a signed message') |
| sp_verify.set_defaults(func=cmd_verify) |
| |
| args = parser.parse_args() |
| |
| logger.setLevel(logging.DEBUG) |
| ch = logging.StreamHandler() |
| formatter = logging.Formatter('%(message)s') |
| ch.setFormatter(formatter) |
| if args.verbose: |
| ch.setLevel(logging.DEBUG) |
| else: |
| ch.setLevel(logging.INFO) |
| logger.addHandler(ch) |
| |
| if 'func' not in args: |
| parser.print_help() |
| sys.exit(1) |
| |
| args.func(args) |