blob: 44deacdb978b61241d3b0343b8e6193a7d5caed8 [file] [log] [blame]
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2020 by the Linux Foundation
import subprocess
import logging
import hashlib
import re
import sys
import gzip
import os
import fnmatch
import email.utils
import email.policy
import email.header
import email.generator
import email.quoprimime
import tempfile
import pathlib
import argparse
import smtplib
import shlex
import textwrap
import urllib.parse
import datetime
import time
import copy
import shutil
import mailbox
# noinspection PyCompatibility
import pwd
import requests
from pathlib import Path
from contextlib import contextmanager
from typing import Optional, Tuple, Set, List, BinaryIO, Union, Sequence, Literal
from email import charset
charset.add_charset('utf-8', None)
# Policy we use for saving mail locally
emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None)
# Presence of these characters requires quoting of the name in the header
# adapted from email._parseaddr
qspecials = re.compile(r'[()<>@,:;.\"\[\]]')
try:
import dkim
can_dkim = True
except ModuleNotFoundError:
can_dkim = False
try:
import patatt
can_patatt = True
except ModuleNotFoundError:
can_patatt = False
# global setting allowing us to turn off networking
can_network = True
__VERSION__ = '0.13-dev'
PW_REST_API_VERSION = '1.2'
def _dkim_log_filter(record):
# Hide all dkim logging output in normal operation by setting the level to
# DEBUG. If debugging output has been enabled then prefix dkim logging
# output to make its origin clear.
record.levelno = logging.DEBUG
record.levelname = 'DEBUG'
record.msg = 'DKIM: ' + record.msg
return True
logger = logging.getLogger('b4')
dkimlogger = logger.getChild('dkim')
dkimlogger.addFilter(_dkim_log_filter)
HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@')
FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)')
DIFF_RE = re.compile(r'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', flags=re.M | re.I)
DIFFSTAT_RE = re.compile(r'^\s*\d+ file.*\d+ (insertion|deletion)', flags=re.M | re.I)
ATT_PASS_SIMPLE = 'v'
ATT_FAIL_SIMPLE = 'x'
ATT_PASS_FANCY = '\033[32m\u2713\033[0m'
ATT_FAIL_FANCY = '\033[31m\u2717\033[0m'
DEVSIG_HDR = 'X-Developer-Signature'
LOREADDR = 'https://lore.kernel.org'
DEFAULT_CONFIG = {
'midmask': LOREADDR + '/all/%s',
'linkmask': LOREADDR + '/r/%s',
'searchmask': LOREADDR + '/all/?x=m&t=1&q=%s',
# You can override the format for the Link: trailer, e.g.
# if you would rather use the Message-Id trailer. It takes the
# message-id as the expansion for %s
# linktrailermask = Message-Id: <%s>
'listid-preference': '*.feeds.kernel.org,*.linux.dev,*.kernel.org,*',
'save-maildirs': 'no',
# off: do not bother checking attestation
# check: print an attaboy when attestation is found
# softfail: print a warning when no attestation found
# hardfail: exit with an error when no attestation found
'attestation-policy': 'softfail',
# How many days before we consider attestation too old?
'attestation-staleness-days': '30',
# Should we check DKIM signatures if we don't find any other attestation?
'attestation-check-dkim': 'yes',
# We'll use the default gnupg homedir, unless you set it here
'attestation-gnupghome': None,
# Do you like simple or fancy checkmarks?
'attestation-checkmarks': 'fancy',
# How long to keep things in cache before expiring (minutes)?
'cache-expire': '10',
# Used when creating summaries for b4 ty
'thanks-commit-url-mask': None,
# See thanks-pr-template.example
'thanks-pr-template': None,
# See thanks-am-template.example
'thanks-am-template': None,
# If this is not set, we'll use what we find in
# git-config for gpg.program, and if that's not set,
# we'll use "gpg" and hope for the better
'gpgbin': None,
# When sending mail, use this sendemail identity configuration
'sendemail-identity': None,
}
# This is where we store actual config
MAIN_CONFIG = None
# This is git-config user.*
USER_CONFIG = None
# This is git-config sendemail.*
SENDEMAIL_CONFIG = None
# Used for storing our requests session
REQSESSION = None
# Indicates that we've cleaned cache already
_CACHE_CLEANED = False
# Used to track mailmap replacements
MAILMAP_INFO = dict()
class LoreMailbox:
msgid_map: dict
series: dict
covers: dict
followups: list
unknowns: list
def __init__(self):
self.msgid_map = dict()
self.series = dict()
self.covers = dict()
self.trailer_map = dict()
self.followups = list()
self.unknowns = list()
def __repr__(self):
out = list()
for key, lser in self.series.items():
out.append(str(lser))
out.append('--- Followups ---')
for lmsg in self.followups:
out.append(' %s' % lmsg.full_subject)
out.append('--- Unknowns ---')
for lmsg in self.unknowns:
out.append(' %s' % lmsg.full_subject)
return '\n'.join(out)
def get_by_msgid(self, msgid: str) -> Optional['LoreMessage']:
if msgid in self.msgid_map:
return self.msgid_map[msgid]
return None
def partial_reroll(self, revision, sloppytrailers):
# Is it a partial reroll?
# To qualify for a partial reroll:
# 1. Needs to be version > 1
# 2. Replies need to be to the exact X/N of the previous revision
if revision <= 1 or revision - 1 not in self.series:
return
# Are existing patches replies to previous revisions with the same counter?
pser = self.get_series(revision-1, sloppytrailers=sloppytrailers)
lser = self.series[revision]
sane = True
for patch in lser.patches:
if patch is None:
continue
if patch.in_reply_to is None or patch.in_reply_to not in self.msgid_map:
logger.debug('Patch not sent as a reply-to')
sane = False
break
ppatch = self.msgid_map[patch.in_reply_to]
found = False
while True:
if patch.counter == ppatch.counter and patch.expected == ppatch.expected:
logger.debug('Found a previous matching patch in v%s', ppatch.revision)
found = True
break
# Do we have another level up?
if ppatch.in_reply_to is None or ppatch.in_reply_to not in self.msgid_map:
break
ppatch = self.msgid_map[ppatch.in_reply_to]
if not found:
sane = False
logger.debug('Patch not a reply to a patch with the same counter/expected (%s/%s != %s/%s)',
patch.counter, patch.expected, ppatch.counter, ppatch.expected)
break
if not sane:
logger.debug('Not a sane partial reroll')
return
logger.info('Partial reroll detected, reconstituting from v%s', pser.revision)
logger.debug('Reconstituting a partial reroll')
at = 0
for patch in lser.patches:
if pser.patches[at] is None:
at += 1
continue
if patch is None:
ppatch = copy.deepcopy(pser.patches[at])
ppatch.revision = lser.revision
ppatch.reroll_from_revision = pser.revision
lser.patches[at] = ppatch
else:
patch.reroll_from_revision = lser.revision
at += 1
if None not in lser.patches[1:]:
lser.complete = True
lser.partial_reroll = True
if lser.patches[0] is not None:
lser.has_cover = True
lser.subject = pser.subject
logger.debug('Reconstituted successfully')
def get_series(self, revision=None, sloppytrailers=False, reroll=True) -> Optional['LoreSeries']:
if revision is None:
if not len(self.series):
return None
# Use the highest revision
revision = max(self.series.keys())
elif revision not in self.series.keys():
return None
lser = self.series[revision]
# Is it empty?
empty = True
for lmsg in lser.patches:
if lmsg is not None:
empty = False
break
if empty:
logger.critical('All patches in series v%s are missing.', lser.revision)
return None
if not lser.complete and reroll:
self.partial_reroll(revision, sloppytrailers)
# Grab our cover letter if we have one
if revision in self.covers:
lser.add_patch(self.covers[revision])
lser.has_cover = True
else:
# Let's find the first patch with an in-reply-to and see if that
# is our cover letter
for member in lser.patches:
if member is not None and member.in_reply_to is not None:
potential = self.get_by_msgid(member.in_reply_to)
if potential is not None and potential.has_diffstat and not potential.has_diff:
# This is *probably* the cover letter
lser.patches[0] = potential
lser.has_cover = True
break
# Do we have any follow-ups?
for fmsg in self.followups:
logger.debug('Analyzing follow-up: %s (%s)', fmsg.full_subject, fmsg.fromemail)
# If there are no trailers in this one, ignore it
if not len(fmsg.trailers):
logger.debug(' no trailers found, skipping')
continue
# Go up through the follow-ups and tally up trailers until
# we either run out of in-reply-tos, or we find a patch in
# one of our series
if fmsg.in_reply_to is None:
# Check if there's something matching in References
refs = fmsg.msg.get('References', '')
pmsg = None
for ref in refs.split():
refid = ref.strip('<>')
if refid in self.msgid_map and refid != fmsg.msgid:
pmsg = self.msgid_map[refid]
break
if pmsg is None:
# Can't find the message we're replying to here
continue
elif fmsg.in_reply_to in self.msgid_map:
pmsg = self.msgid_map[fmsg.in_reply_to]
else:
logger.debug(' missing message, skipping: %s', fmsg.in_reply_to)
continue
trailers, mismatches = fmsg.get_trailers(sloppy=sloppytrailers)
for ltr in mismatches:
lser.trailer_mismatches.add((ltr.name, ltr.value, fmsg.fromname, fmsg.fromemail))
lvl = 1
while True:
logger.debug('%sParent: %s', ' ' * lvl, pmsg.full_subject)
logger.debug('%sTrailers:', ' ' * lvl)
for ltr in trailers:
logger.debug('%s%s: %s', ' ' * (lvl+1), ltr.name, ltr.value)
if pmsg.has_diff and not pmsg.reply:
# We found the patch for these trailers
if pmsg.revision != revision:
# add this into our trailer map to carry over trailers from
# previous revisions to current revision if patch id did
# not change
if pmsg.pwhash:
if pmsg.pwhash not in self.trailer_map:
self.trailer_map[pmsg.pwhash] = list()
self.trailer_map[pmsg.pwhash] += trailers
pmsg.followup_trailers += trailers
break
if not pmsg.reply:
# Could be a cover letter
pmsg.followup_trailers += trailers
break
if pmsg.in_reply_to and pmsg.in_reply_to in self.msgid_map:
# Avoid bad message id causing infinite loop
if pmsg == self.msgid_map[pmsg.in_reply_to]:
break
lvl += 1
for pltr in pmsg.trailers:
pltr.lmsg = pmsg
trailers.append(pltr)
pmsg = self.msgid_map[pmsg.in_reply_to]
continue
break
# Carry over trailers from previous series if patch/metadata did not change
for lmsg in lser.patches:
if lmsg is None or lmsg.pwhash is None:
continue
if lmsg.pwhash in self.trailer_map:
lmsg.followup_trailers += self.trailer_map[lmsg.pwhash]
return lser
def add_message(self, msg: email.message.Message) -> None:
msgid = LoreMessage.get_clean_msgid(msg)
if msgid and msgid in self.msgid_map:
logger.debug('Already have a message with this msgid, skipping %s', msgid)
return
lmsg = LoreMessage(msg)
logger.debug('Looking at: %s', lmsg.full_subject)
if msgid:
self.msgid_map[lmsg.msgid] = lmsg
if lmsg.reply:
# We'll figure out where this belongs later
logger.debug(' adding to followups')
self.followups.append(lmsg)
return
if lmsg.counter == 0 and (not lmsg.counters_inferred or lmsg.has_diffstat):
# Cover letter
# Add it to covers -- we'll deal with them later
logger.debug(' adding as v%s cover letter', lmsg.revision)
self.covers[lmsg.revision] = lmsg
return
if lmsg.has_diff:
if lmsg.revision not in self.series:
if lmsg.revision_inferred and lmsg.in_reply_to:
# We have an inferred revision here.
# Do we have an upthread cover letter that specifies a revision?
irt = self.get_by_msgid(lmsg.in_reply_to)
if irt is not None and irt.has_diffstat and not irt.has_diff:
# Yes, this is very likely our cover letter
logger.debug(' fixed revision to v%s', irt.revision)
lmsg.revision = irt.revision
# alternatively, see if upthread is patch 1
elif lmsg.counter > 0 and irt is not None and irt.has_diff and irt.counter == 1:
logger.debug(' fixed revision to v%s', irt.revision)
lmsg.revision = irt.revision
# Run our check again
if lmsg.revision not in self.series:
self.series[lmsg.revision] = LoreSeries(lmsg.revision, lmsg.expected)
if len(self.series) > 1:
logger.debug('Found new series v%s', lmsg.revision)
# Attempt to auto-number series from the same author who did not bother
# to set v2, v3, etc. in the patch revision
if (lmsg.counter == 1 and lmsg.counters_inferred
and not lmsg.reply and lmsg.lsubject.patch and not lmsg.lsubject.resend):
omsg = self.series[lmsg.revision].patches[lmsg.counter]
if (omsg is not None and omsg.counters_inferred and lmsg.fromemail == omsg.fromemail
and omsg.date < lmsg.date):
lmsg.revision = len(self.series) + 1
self.series[lmsg.revision] = LoreSeries(lmsg.revision, lmsg.expected)
logger.info('Assuming new revision: v%s (%s)', lmsg.revision, lmsg.full_subject)
logger.debug(' adding as patch')
self.series[lmsg.revision].add_patch(lmsg)
return
logger.debug(' adding to unknowns')
self.unknowns.append(lmsg)
class LoreSeries:
revision: int
expected: int
patches: List[Optional['LoreMessage']]
followups: List['LoreMessage']
trailer_mismatches: Set[Tuple[str, str, str, str]]
complete: bool = False
has_cover: bool = False
partial_reroll: bool = False
subject: str
indexes: Optional[List[Tuple[str, str]]] = None
base_commit: Optional[str] = None
change_id: Optional[str] = None
def __init__(self, revision: int, expected: int) -> None:
self.revision = revision
self.expected = expected
self.patches = [None] * (expected+1)
self.followups = list()
self.trailer_mismatches = set()
self.subject = '(untitled)'
def __repr__(self):
out = list()
out.append('- Series: [v%s] %s' % (self.revision, self.subject))
out.append(' revision: %s' % self.revision)
out.append(' expected: %s' % self.expected)
out.append(' complete: %s' % self.complete)
out.append(' has_cover: %s' % self.has_cover)
out.append(' base_commit: %s' % self.base_commit)
out.append(' change_id: %s' % self.change_id)
out.append(' partial_reroll: %s' % self.partial_reroll)
out.append(' patches:')
at = 0
for member in self.patches:
if member is not None:
out.append(' [%s/%s] %s' % (at, self.expected, member.subject))
else:
out.append(' [%s/%s] MISSING' % (at, self.expected))
at += 1
return '\n'.join(out)
def add_patch(self, lmsg: 'LoreMessage') -> None:
while len(self.patches) < lmsg.expected + 1:
self.patches.append(None)
self.expected = lmsg.expected
if self.patches[lmsg.counter] is not None:
# Okay, weird, is the one in there a reply?
omsg = self.patches[lmsg.counter]
if omsg.reply or (omsg.counters_inferred and not lmsg.counters_inferred):
# Replace that one with this one
logger.debug(' replacing existing: %s', omsg.subject)
self.patches[lmsg.counter] = lmsg
else:
self.patches[lmsg.counter] = lmsg
self.complete = not (None in self.patches[1:])
if lmsg.counter == 0:
# This is a cover letter
if '\nbase-commit:' in lmsg.body:
matches = re.search(r'^base-commit: .*?([\da-f]+)', lmsg.body, flags=re.I | re.M)
if matches:
self.base_commit = matches.groups()[0]
if '\nchange-id:' in lmsg.body:
matches = re.search(r'^change-id:\s+(\S+)', lmsg.body, flags=re.I | re.M)
if matches:
self.change_id = matches.groups()[0]
if self.patches[0] is not None:
self.subject = self.patches[0].subject
elif self.patches[1] is not None:
self.subject = self.patches[1].subject
def get_slug(self, extended: bool = False) -> str:
# Find the first non-None entry
lmsg = None
for lmsg in self.patches:
if lmsg is not None:
break
if lmsg is None:
return 'undefined'
prefix = lmsg.date.strftime('%Y%m%d')
authorline = email.utils.getaddresses([str(x) for x in lmsg.msg.get_all('from', [])])[0]
if extended:
local = authorline[1].split('@')[0]
unsafe = '%s_%s_%s' % (prefix, local, lmsg.subject)
slug = re.sub(r'\W+', '_', unsafe).strip('_').lower()
else:
author = re.sub(r'\W+', '_', authorline[1]).strip('_').lower()
slug = '%s_%s' % (prefix, author)
if self.revision != 1:
slug = 'v%s_%s' % (self.revision, slug)
return slug[:100]
def add_extra_trailers(self, trailers: tuple) -> None:
for lmsg in self.patches[1:]:
if lmsg is None:
continue
lmsg.followup_trailers += trailers
def add_cover_trailers(self) -> None:
if self.patches[0] and self.patches[0].followup_trailers: # noqa
self.add_extra_trailers(self.patches[0].followup_trailers) # noqa
def get_am_ready(self, noaddtrailers=False, covertrailers=False, addmysob=False, addlink=False,
cherrypick=None, copyccs=False, allowbadchars=False) -> List[email.message.Message]:
usercfg = get_user_config()
config = get_main_config()
if addmysob:
if 'name' not in usercfg or 'email' not in usercfg:
logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email')
addmysob = False
attpolicy = config['attestation-policy']
try:
maxdays = int(config['attestation-staleness-days'])
except ValueError:
logger.info('WARNING: attestation-staleness-days must be an int')
maxdays = 0
# Loop through all patches and see if attestation is the same for all of them,
# since it usually is
attref = None
attsame = True
attmark = None
attcrit = False
if attpolicy != 'off':
logger.info('Checking attestation on all messages, may take a moment...')
for lmsg in self.patches[1:]:
if lmsg is None:
attsame = False
break
checkmark, trailers, attcrit = lmsg.get_attestation_trailers(attpolicy, maxdays)
if attref is None:
attref = trailers
attmark = checkmark
continue
if set(trailers) == set(attref):
continue
attsame = False
logger.debug('Attestation info is not the same')
break
if covertrailers:
self.add_cover_trailers()
at = 1
msgs = list()
logger.info('---')
for lmsg in self.patches[1:]:
if cherrypick is not None:
if at not in cherrypick:
at += 1
logger.debug(' skipped: [%s/%s] (not in cherrypick)', at, self.expected)
continue
if lmsg is None:
logger.critical('CRITICAL: [%s/%s] is missing, cannot cherrypick', at, self.expected)
raise KeyError('Cherrypick not in series')
if lmsg is not None:
extras = list()
if addlink:
linktrailer = None
ltrmask = config.get('linktrailermask')
if ltrmask:
if ltrmask.find(':'):
lparts = ltrmask.split(':', maxsplit=1)
llname = lparts[0].strip()
llval = lparts[1].strip() % lmsg.msgid
linktrailer = LoreTrailer(name=llname, value=llval)
else:
logger.critical('linktrailermask does not look like a valid trailer, using defaults')
if not linktrailer:
llval = config.get('linkmask', LOREADDR + '/r/%s') % lmsg.msgid
linktrailer = LoreTrailer(name='Link', value=llval)
extras.append(linktrailer)
if attsame and not attcrit:
if attmark:
logger.info(' %s %s', attmark, lmsg.get_am_subject())
else:
logger.info(' %s', lmsg.get_am_subject())
else:
checkmark, trailers, critical = lmsg.get_attestation_trailers(attpolicy, maxdays)
if checkmark:
logger.info(' %s %s', checkmark, lmsg.get_am_subject())
else:
logger.info(' %s', lmsg.get_am_subject())
for trailer in trailers:
logger.info(' %s', trailer)
if critical:
import sys
logger.critical('---')
logger.critical('Exiting due to attestation-policy: hardfail')
sys.exit(128)
add_trailers = True
if noaddtrailers:
add_trailers = False
msg = lmsg.get_am_message(add_trailers=add_trailers, extras=extras, copyccs=copyccs,
addmysob=addmysob, allowbadchars=allowbadchars)
msgs.append(msg)
else:
logger.error(' ERROR: missing [%s/%s]!', at, self.expected)
at += 1
if attpolicy == 'off':
return msgs
if attsame and attref:
logger.info(' ---')
for trailer in attref:
logger.info(' %s', trailer)
if not (can_dkim and can_patatt):
logger.info(' ---')
if not can_dkim:
logger.info(' NOTE: install dkimpy for DKIM signature verification')
if not can_patatt:
logger.info(' NOTE: install patatt for end-to-end signature verification')
return msgs
def populate_indexes(self):
self.indexes = list()
seenfiles = set()
for lmsg in self.patches[1:]:
if lmsg is None or lmsg.blob_indexes is None:
continue
for ofn, obh, nfn in lmsg.blob_indexes:
if ofn in seenfiles:
# if we have seen this file once already, then it's a repeat patch
# it's no longer going to match current hash
continue
seenfiles.add(ofn)
if set(obh) == {'0'}:
# New file, will for sure apply clean
continue
self.indexes.append((ofn, obh))
def check_applies_clean(self, gitdir: str, at: Optional[str] = None) -> Tuple[int, list]:
if self.indexes is None:
self.populate_indexes()
mismatches = list()
if at is None:
at = 'HEAD'
for fn, bh in self.indexes:
ecode, out = git_run_command(gitdir, ['ls-tree', at, fn])
if ecode == 0 and len(out):
chunks = out.split()
if chunks[2].startswith(bh):
logger.debug('%s hash: matched', fn)
continue
else:
logger.debug('%s hash: %s (expected: %s)', fn, chunks[2], bh)
else:
# Couldn't get this file, continue
logger.debug('Could not look up %s:%s', at, fn)
mismatches.append((fn, bh))
return len(self.indexes), mismatches
def find_base(self, gitdir: str, branches: Optional[list] = None, maxdays: int = 30) -> Tuple[str, len, len]:
# Find the date of the first patch we have
pdate = datetime.datetime.now()
for lmsg in self.patches:
if lmsg is None:
continue
pdate = lmsg.date
break
# Find the latest commit on that date
guntil = pdate.strftime('%Y-%m-%d')
if branches:
where = branches
else:
where = ['--all']
gitargs = ['log', '--pretty=oneline', '--until', guntil, '--max-count=1'] + where
lines = git_get_command_lines(gitdir, gitargs)
if not lines:
raise IndexError
commit = lines[0].split()[0]
checked, mismatches = self.check_applies_clean(gitdir, commit)
fewest = len(mismatches)
if fewest > 0:
since = pdate - datetime.timedelta(days=maxdays)
gsince = since.strftime('%Y-%m-%d')
logger.debug('Starting --find-object from %s to %s', gsince, guntil)
best = commit
for fn, bi in mismatches:
logger.debug('Finding tree matching %s=%s in %s', fn, bi, where)
gitargs = ['log', '--pretty=oneline', '--since', gsince, '--until', guntil,
'--find-object', bi] + where
lines = git_get_command_lines(gitdir, gitargs)
if not lines:
logger.debug('Could not find object %s in the tree', bi)
continue
for line in lines:
commit = line.split()[0]
logger.debug('commit=%s', commit)
# We try both that commit and the one preceding it, in case it was a deletion
# Keep track of the fewest mismatches
for tc in [commit, f'{commit}~1']:
sc, sm = self.check_applies_clean(gitdir, tc)
if len(sm) < fewest and len(sm) != sc:
fewest = len(sm)
best = tc
logger.debug('fewest=%s, best=%s', fewest, best)
if fewest == 0:
break
if fewest == 0:
break
if fewest == 0:
break
if fewest == 0:
break
else:
best = commit
if fewest == len(self.indexes):
# None of the blobs matched
raise IndexError
lines = git_get_command_lines(gitdir, ['describe', '--all', best])
if len(lines):
return lines[0], len(self.indexes), fewest
raise IndexError
def make_fake_am_range(self, gitdir):
start_commit = end_commit = None
# Use the msgid of the first non-None patch in the series
msgid = None
for lmsg in self.patches:
if lmsg is not None:
msgid = lmsg.msgid
break
if msgid is None:
logger.critical('Cannot operate on an empty series')
return None, None
cachedata = get_cache(msgid, suffix='fakeam')
if cachedata and not self.partial_reroll:
stalecache = False
chunks = cachedata.strip().split()
if len(chunks) == 2:
start_commit, end_commit = chunks
else:
stalecache = True
if start_commit is not None and end_commit is not None:
# Make sure they are still there
ecode, out = git_run_command(gitdir, ['cat-file', '-e', start_commit])
if ecode > 0:
stalecache = True
else:
ecode, out = git_run_command(gitdir, ['cat-file', '-e', end_commit])
if ecode > 0:
stalecache = True
else:
logger.debug('Using previously generated range')
return start_commit, end_commit
if stalecache:
logger.debug('Stale cache for [v%s] %s', self.revision, self.subject)
clear_cache(msgid, suffix='fakeam')
logger.info('Preparing fake-am for v%s: %s', self.revision, self.subject)
with git_temp_worktree(gitdir):
# We are in a temporary chdir at this time, so writing to a known file should be safe
mbxf = '.__git-am__'
mbx = mailbox.mbox(mbxf)
# Logic largely borrowed from gj_tools
seenfiles = set()
for lmsg in self.patches[1:]:
if lmsg is None:
logger.critical('ERROR: v%s series incomplete; unable to create a fake-am range', self.revision)
return None, None
logger.debug('Looking at %s', lmsg.full_subject)
if not lmsg.blob_indexes:
logger.critical('ERROR: some patches do not have indexes')
logger.critical(' unable to create a fake-am range')
return None, None
for ofn, ofi, nfn in lmsg.blob_indexes:
if ofn in seenfiles:
# We already processed this file, so this blob won't match
continue
seenfiles.add(ofn)
if set(ofi) == {'0'}:
# New file creation, nothing to do here
logger.debug(' New file: %s', ofn)
continue
if not ofn == nfn:
# renamed file, make sure to not add the new name later on
logger.debug(' Renamed file: %s -> %s', ofn, nfn)
seenfiles.add(nfn)
# Try to grab full ref_id of this hash
ecode, out = git_run_command(gitdir, ['rev-parse', ofi])
if ecode > 0:
logger.critical(' ERROR: Could not find matching blob for %s (%s)', ofn, ofi)
logger.critical(' If you know on which tree this patchset is based,')
logger.critical(' add it as a remote and perform "git remote update"')
logger.critical(' in order to fetch the missing objects.')
return None, None
logger.debug(' Found matching blob for: %s', ofn)
fullref = out.strip()
gitargs = ['update-index', '--add', '--cacheinfo', f'0644,{fullref},{ofn}']
ecode, out = git_run_command(None, gitargs)
if ecode > 0:
logger.critical(' ERROR: Could not run update-index for %s (%s)', ofn, fullref)
return None, None
mbx.add(lmsg.msg.as_string(policy=emlpolicy).encode('utf-8'))
mbx.close()
ecode, out = git_run_command(None, ['write-tree'])
if ecode > 0:
logger.critical('ERROR: Could not write fake-am tree')
return None, None
treeid = out.strip()
# At this point we have a worktree with files that should cleanly receive a git am
gitargs = ['commit-tree', treeid + '^{tree}', '-F', '-']
ecode, out = git_run_command(None, gitargs, stdin='Initial fake commit'.encode('utf-8'))
if ecode > 0:
logger.critical('ERROR: Could not commit-tree')
return None, None
start_commit = out.strip()
git_run_command(None, ['reset', '--hard', start_commit])
ecode, out = git_run_command(None, ['am', mbxf])
if ecode > 0:
logger.critical('ERROR: Could not fake-am version %s', self.revision)
return None, None
ecode, out = git_run_command(None, ['rev-parse', 'HEAD'])
end_commit = out.strip()
logger.info(' range: %.12s..%.12s', start_commit, end_commit)
logger.debug('Saving into cache:')
logger.debug(' %s..%s', start_commit, end_commit)
save_cache(f'{start_commit} {end_commit}\n', msgid, suffix='fakeam')
return start_commit, end_commit
def save_cover(self, outfile):
# noinspection PyUnresolvedReferences
cover_msg = self.patches[0].get_am_message(add_trailers=False)
with open(outfile, 'wb') as fh:
fh.write(LoreMessage.get_msg_as_bytes(cover_msg, headers='decode'))
logger.critical('Cover: %s', outfile)
class LoreTrailer:
type: str
name: str
lname: str
value: str
extinfo: Optional[str] = None
addr: Optional[Tuple[str, str]] = None
lmsg = None
# Small list of recognized utility trailers
_utility: Set[str] = {'fixes', 'link', 'buglink', 'closes', 'obsoleted-by', 'message-id', 'change-id', 'base-commit'}
def __init__(self, name: Optional[str] = None, value: Optional[str] = None, extinfo: Optional[str] = None,
msg: Optional[email.message.Message] = None):
if name is None:
self.name = 'Signed-off-by'
ucfg = get_user_config()
self.value = '%s <%s>' % (ucfg['name'], ucfg['email'])
self.type = 'person'
self.addr = (ucfg['name'], ucfg['email'])
else:
self.name = name
self.value = value
if name.lower() in self._utility or '://' in value:
self.type = 'utility'
elif re.search(r'\S+@\S+\.\S+', value):
self.type = 'person'
self.addr = email.utils.parseaddr(value)
else:
self.type = 'unknown'
self.lname = self.name.lower()
self.extinfo = extinfo
self.msg = msg
def as_string(self, omit_extinfo: bool = False) -> str:
ret = f'{self.name}: {self.value}'
if not self.extinfo or omit_extinfo:
return ret
# extinfo can be either be [on the next line], or # at the end
if self.extinfo.lstrip()[0] == '#':
ret += self.extinfo
else:
ret += f'\n{self.extinfo}'
return ret
def email_eq(self, cmp_email: str, fuzzy: bool = True) -> bool:
if not self.addr:
return False
our = self.addr[1].lower()
their = cmp_email.lower()
if our == their:
return True
if not fuzzy:
return False
if '@' not in our or '@' not in their:
return False
# Strip extended local parts often added by people, e.g.:
# comparing foo@example.com and foo+kernel@example.com should match
our = re.sub(r'\+[^@]+@', '@', our)
their = re.sub(r'\+[^@]+@', '@', their)
if our == their:
return True
# See if domain part of one of the addresses is a subset of the other one,
# which should match cases like foo@linux.intel.com and foo@intel.com
olocal, odomain = our.split('@', maxsplit=1)
tlocal, tdomain = their.split('@', maxsplit=1)
if olocal != tlocal:
return False
if (abs(odomain.count('.')-tdomain.count('.')) == 1
and (odomain.endswith(f'.{tdomain}') or tdomain.endswith(f'.{odomain}'))):
return True
return False
def __eq__(self, other):
# We never compare extinfo, we just tack it if we find a match
return self.lname == other.lname and self.value.lower() == other.value.lower()
def __hash__(self):
return hash(f'{self.lname}: {self.value}')
def __repr__(self):
out = list()
out.append(' type: %s' % self.type)
out.append(' name: %s' % self.name)
out.append(' value: %s' % self.value)
out.append(' extinfo: %s' % self.extinfo)
return '\n'.join(out)
class LoreMessage:
def __init__(self, msg):
self.msg = msg
self.msgid = None
# Subject-based info
self.lsubject = None
self.full_subject = None
self.subject = None
self.reply = False
self.revision = 1
self.reroll_from_revision = None
self.counter = 1
self.expected = 1
self.revision_inferred = True
self.counters_inferred = True
# Header-based info
self.in_reply_to = None
self.fromname = None
self.fromemail = None
self.date = None
# Body and body-based info
self.body = None
self.message = None
self.charset = 'utf-8'
self.has_diff = False
self.has_diffstat = False
self.trailers = list()
self.followup_trailers = list()
# These are populated by pr
self.pr_base_commit = None
self.pr_repo = None
self.pr_ref = None
self.pr_tip_commit = None
self.pr_remote_tip_commit = None
# Patchwork hash
self.pwhash = None
# Blob indexes
self.blob_indexes = None
self.msgid = LoreMessage.get_clean_msgid(self.msg)
self.lsubject = LoreSubject(msg['Subject'])
# Copy them into this object for convenience
self.full_subject = self.lsubject.full_subject
self.subject = self.lsubject.subject
self.reply = self.lsubject.reply
self.revision = self.lsubject.revision
self.counter = self.lsubject.counter
self.expected = self.lsubject.expected
self.revision_inferred = self.lsubject.revision_inferred
self.counters_inferred = self.lsubject.counters_inferred
# Loaded when attestors property is called
self._attestors = None
# Handle [PATCH 6/5]
if self.counter > self.expected:
self.expected = self.counter
self.in_reply_to = LoreMessage.get_clean_msgid(self.msg, header='In-Reply-To')
try:
fromdata = email.utils.getaddresses([LoreMessage.clean_header(str(x))
for x in self.msg.get_all('from', [])])[0]
self.fromname = fromdata[0]
self.fromemail = fromdata[1]
if not len(self.fromname.strip()):
self.fromname = self.fromemail
except IndexError:
pass
msgdate = self.msg.get('Date')
if msgdate:
self.date = email.utils.parsedate_to_datetime(str(msgdate))
else:
# An email without a Date: field?
self.date = datetime.datetime.now()
# Force it to UTC if it's naive
if self.date.tzinfo is None:
self.date = self.date.replace(tzinfo=datetime.timezone.utc)
# walk until we find the first text/plain part
self.body, self.charset = LoreMessage.get_payload(self.msg)
if self.body is None:
# Woah, we didn't find any usable parts
logger.debug(' No plain or patch parts found in message')
logger.info(' Not plaintext: %s', self.full_subject)
return
if DIFFSTAT_RE.search(self.body):
self.has_diffstat = True
if DIFF_RE.search(self.body):
self.has_diff = True
self.pwhash = LoreMessage.get_patchwork_hash(self.body)
self.blob_indexes = LoreMessage.get_indexes(self.body)
trailers, others = LoreMessage.find_trailers(self.body, followup=True)
# We only pay attention to trailers that are sent in reply
if trailers and self.in_reply_to and not self.has_diff:
logger.debug('A follow-up missing a Re: but containing a trailer with no patch diff')
self.reply = True
if self.reply:
for trailer in trailers:
# These are commonly part of patch/commit metadata
badtrailers = {'from', 'author', 'cc', 'to', 'date', 'subject',
'subscribe', 'unsubscribe'}
if trailer.lname not in badtrailers:
self.trailers.append(trailer)
def get_trailers(self, sloppy: bool = False) -> Tuple[List[LoreTrailer], Set[LoreTrailer]]:
trailers = list()
mismatches = set()
for ltr in self.trailers:
ltr.lmsg = self
if sloppy or ltr.type != 'person':
trailers.append(ltr)
continue
if ltr.email_eq(self.fromemail):
logger.debug(' trailer email match')
trailers.append(ltr)
continue
# Does the name match, at least?
nmatch = False
tlname = ltr.addr[0].lower()
hlname = self.fromname.lower()
if tlname == hlname:
logger.debug(' trailer exact name match')
nmatch = True
# Finally, see if the header From has a comma in it and try to find all
# parts in the trailer name
elif hlname.find(',') > 0:
nmatch = True
for nchunk in hlname.split(','):
if hlname.find(nchunk.strip()) < 0:
nmatch = False
break
if nmatch:
logger.debug(' trailer fuzzy name match')
trailers.append(ltr)
continue
logger.debug('trailer did not match: %s: %s', ltr.name, ltr.value)
mismatches.add(ltr)
return trailers, mismatches
@property
def attestors(self):
if self._attestors is not None:
return self._attestors
self._attestors = list()
config = get_main_config()
if config['attestation-policy'] == 'off':
return self._attestors
logger.debug('Loading attestation: %s', self.full_subject)
if self.msg.get(DEVSIG_HDR):
self._load_patatt_attestors()
if self.msg.get('dkim-signature') and config['attestation-check-dkim'] == 'yes':
self._load_dkim_attestors()
logger.debug('Attestors: %s', len(self._attestors))
return self._attestors
def _load_dkim_attestors(self) -> None:
if not can_network:
logger.debug('Message has DKIM signatures, but can_network is off')
return
if not can_dkim:
logger.debug('Message has DKIM signatures, but can_dkim is off')
return
# Yank out all DKIM-Signature headers and try them in reverse order
# until we come to a passing one
dkhdrs = list()
for header in list(self.msg._headers): # noqa
if header[0].lower() == 'dkim-signature':
dkhdrs.append(header)
self.msg._headers.remove(header) # noqa
dkhdrs.reverse()
seenatts = list()
for hn, hval in dkhdrs:
# Handle MIME encoded-word syntax or other types of header encoding if
# present.
if '?q?' in hval:
hval = str(email.header.make_header(email.header.decode_header(hval)))
errors = list()
hdata = LoreMessage.get_parts_from_header(hval)
logger.debug('Loading DKIM attestation for d=%s, s=%s', hdata['d'], hdata['s'])
identity = hdata.get('i')
if not identity:
identity = hdata.get('d')
ts = hdata.get('t')
signtime = None
if ts:
signtime = LoreAttestor.parse_ts(ts)
else:
# See if date is included in the h: field
sh = hdata.get('h')
if 'date' in sh.lower().split(':'):
signtime = self.date
self.msg._headers.append((hn, hval)) # noqa
try:
res = dkim.verify(self.msg.as_bytes(policy=emlpolicy), logger=dkimlogger)
logger.debug('DKIM verify results: %s=%s', identity, res)
except Exception as ex: # noqa
# Usually, this is due to some DNS resolver failure, which we can't
# possibly cleanly try/catch. Just mark it as failed and move on.
logger.debug('DKIM attestation failed: %s', ex)
errors.append(str(ex))
res = False
attestor = LoreAttestorDKIM(res, identity, signtime, errors)
if attestor.check_identity(self.fromemail):
# use this one, regardless of any other DKIM signatures
self._attestors.append(attestor)
return
self.msg._headers.pop(-1) # noqa
seenatts.append(attestor)
# No exact domain matches, so return everything we have
self._attestors += seenatts
def _trim_body(self) -> None:
# Get the length specified in the X-Developer-Signature header
xdsh = self.msg.get('X-Developer-Signature')
if not xdsh:
return
matches = re.search(r'\s+l=(\d+)', xdsh)
if not matches:
return
bl = int(matches.groups()[0])
i, m, p = get_mailinfo(self.msg.as_bytes(policy=emlpolicy), scissors=False)
bb = b''
for line in re.sub(rb'[\r\n]*$', b'', m + p).split(b'\n'):
bb += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
if len(bb) > bl:
self.body = bb[:bl].decode()
# This may have potentially resulted in in-body From/Subject being removed,
# so make sure we put them back into the body if they are different
ibh = list()
if i.get('Subject') != self.subject:
ibh.append('Subject: ' + i.get('Subject'))
if i.get('Email') != self.fromemail or i.get('Author') != self.fromname:
ibh.append('From: ' + format_addrs([(i.get('Author'), i.get('Email'))]))
if len(ibh):
self.body = '\n'.join(ibh) + '\n\n' + self.body
def _load_patatt_attestors(self) -> None:
if not can_patatt:
logger.debug('Message has %s headers, but can_patatt is off', DEVSIG_HDR)
return
# load our key sources if necessary
ddir = get_data_dir()
pdir = os.path.join(ddir, 'keyring')
config = get_main_config()
sources = config.get('keyringsrc')
if not sources:
# fallback to patatt's keyring if none is specified for b4
patatt_config = patatt.get_config_from_git(r'patatt\..*', multivals=['keyringsrc'])
sources = patatt_config.get('keyringsrc')
if not sources:
sources = ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:']
if pdir not in sources:
sources.append(pdir)
# Push our logger and GPGBIN into patatt
patatt.logger = logger
patatt.GPGBIN = config['gpgbin']
logger.debug('Loading patatt attestations with sources=%s', str(sources))
success = False
trim_body = False
while True:
attestations = patatt.validate_message(self.msg.as_bytes(policy=emlpolicy), sources, trim_body=trim_body)
# Do we have any successes?
for attestation in attestations:
if attestation[0] == patatt.RES_VALID:
success = True
break
if success:
if trim_body:
# If we only succeeded after trimming the body, then we MUST set the body
# to that value, otherwise someone can append arbitrary content after the l= value
# limit message.
self._trim_body()
break
if not success and trim_body:
break
trim_body = True
for result, identity, signtime, keysrc, keyalgo, errors in attestations:
if keysrc and keysrc.startswith('(default keyring)/'):
fpr = keysrc.split('/', 1)[1]
uids = get_gpg_uids(fpr)
idmatch = False
for uid in uids:
if uid.find(identity) >= 0:
idmatch = True
break
if not idmatch:
# Take the first identity in the list and use that instead
parts = email.utils.parseaddr(uids[0])
identity = parts[1]
if signtime:
signdt = LoreAttestor.parse_ts(signtime)
else:
signdt = None
attestor = LoreAttestorPatatt(result, identity, signdt, keysrc, keyalgo, errors)
self._attestors.append(attestor)
def get_attestation_trailers(self, attpolicy: str, maxdays: int = 0) -> Tuple[str, list, bool]:
trailers = list()
checkmark = None
critical = False
for attestor in self.attestors:
if attestor.passing and maxdays and not attestor.check_time_drift(self.date, maxdays):
logger.debug('The time drift is too much, marking as non-passing')
attestor.passing = False
if not attestor.passing:
# Is it a person-trailer for which we have a key?
if attestor.level == 'person':
if attestor.have_key:
# This was signed, and we have a key, but it's failing
trailers.append('%s BADSIG: %s' % (attestor.checkmark, attestor.trailer))
checkmark = attestor.checkmark
elif attpolicy in ('softfail', 'hardfail'):
trailers.append('%s No key: %s' % (attestor.checkmark, attestor.trailer))
# This is not critical even in hardfail
continue
elif attpolicy in ('softfail', 'hardfail'):
if not checkmark:
checkmark = attestor.checkmark
trailers.append('%s BADSIG: %s' % (attestor.checkmark, attestor.trailer))
if attpolicy == 'hardfail':
critical = True
else:
passing = False
if not checkmark:
checkmark = attestor.checkmark
if attestor.check_identity(self.fromemail):
passing = True
else:
# Do we have an x-original-from?
xofh = self.msg.get('X-Original-From')
if xofh:
logger.debug('Using X-Original-From for identity check')
xpair = email.utils.getaddresses([xofh])[0]
if attestor.check_identity(xpair[1]):
passing = True
# Fix our fromname and fromemail, mostly for thanks-tracking
self.fromname = xpair[0]
self.fromemail = xpair[1]
# Drop the reply-to header if it's exactly the same
for header in list(self.msg._headers): # noqa
if header[0].lower() == 'reply-to' and header[1].find(xpair[1]) > 0:
self.msg._headers.remove(header) # noqa
if passing:
trailers.append('%s Signed: %s' % (attestor.checkmark, attestor.trailer))
else:
trailers.append('%s Signed: %s (From: %s)' % (attestor.checkmark, attestor.trailer,
self.fromemail))
return checkmark, trailers, critical
def __repr__(self):
out = list()
out.append('msgid: %s' % self.msgid)
out.append(str(self.lsubject))
out.append(' fromname: %s' % self.fromname)
out.append(' fromemail: %s' % self.fromemail)
out.append(' date: %s' % str(self.date))
out.append(' in_reply_to: %s' % self.in_reply_to)
# Header-based info
out.append(' --- begin body ---')
for line in self.body.split('\n'):
out.append(' |%s' % line)
out.append(' --- end body ---')
# Body and body-based info
out.append(' has_diff: %s' % self.has_diff)
out.append(' has_diffstat: %s' % self.has_diffstat)
out.append(' --- begin my trailers ---')
for trailer in self.trailers:
out.append(' |%s' % str(trailer))
out.append(' --- begin followup trailers ---')
for trailer in self.followup_trailers:
out.append(' |%s' % str(trailer))
out.append(' --- end trailers ---')
out.append(' --- begin attestors ---')
for attestor in self.attestors:
out.append(' |%s' % str(attestor))
out.append(' --- end attestors ---')
return '\n'.join(out)
@staticmethod
def get_payload(msg: email.message.Message) -> Tuple[str, str]:
# walk until we find the first text/plain part
mcharset = msg.get_content_charset()
if not mcharset:
mcharset = 'utf-8'
mbody = None
for part in msg.walk():
cte = part.get_content_type()
if cte.find('/plain') < 0 and cte.find('/x-patch') < 0:
continue
payload = part.get_payload(decode=True)
if payload is None:
continue
pcharset = part.get_content_charset()
if not pcharset:
pcharset = mcharset
try:
payload = payload.decode(pcharset, errors='replace')
mcharset = pcharset
except LookupError:
# what kind of encoding is that?
# Whatever, we'll use utf-8 and hope for the best
payload = payload.decode('utf-8', errors='replace')
part.set_param('charset', 'utf-8')
mcharset = 'utf-8'
if mbody is None:
mbody = payload
continue
# If we already found a body, but we now find something that contains a diff,
# then we prefer this part
if DIFF_RE.search(payload):
mbody = payload
return mbody, mcharset
@staticmethod
def clean_header(hdrval):
if hdrval is None:
return ''
if hdrval.find('=?') >= 0:
# Do we have any email addresses in there?
if re.search(r'<\S+@\S+>', hdrval, flags=re.I | re.M):
newaddrs = list()
for addr in email.utils.getaddresses([hdrval]):
if addr[0].find('=?') >= 0:
# Nothing wrong with nested calls, right?
addr = (LoreMessage.clean_header(addr[0]), addr[1])
# Work around https://github.com/python/cpython/issues/100900
if re.search(r'[^\w\s]', addr[0]):
newaddrs.append(f'"{addr[0]}" <{addr[1]}>')
else:
newaddrs.append(email.utils.formataddr(addr))
return ', '.join(newaddrs)
decoded = ''
for hstr, hcs in email.header.decode_header(hdrval):
if hcs is None:
hcs = 'utf-8'
try:
decoded += hstr.decode(hcs, errors='replace')
except LookupError:
# Try as utf-8
decoded += hstr.decode('utf-8', errors='replace')
except (UnicodeDecodeError, AttributeError):
decoded += hstr
else:
decoded = hdrval
new_hdrval = re.sub(r'\n?\s+', ' ', decoded)
return new_hdrval.strip()
@staticmethod
def wrap_header(hdr, width: int = 75, nl: str = '\n',
transform: Literal['encode', 'decode', 'preserve'] = 'preserve') -> bytes:
hname, hval = hdr
if hname.lower() in ('to', 'cc', 'from', 'x-original-from'):
_parts = [f'{hname}: ']
first = True
for addr in email.utils.getaddresses([hval]):
if transform == 'encode' and not addr[0].isascii():
addr = (email.quoprimime.header_encode(addr[0].encode(), charset='utf-8'), addr[1])
qp = format_addrs([addr], clean=False)
elif transform == 'decode':
qp = format_addrs([addr], clean=True)
else:
qp = format_addrs([addr], clean=False)
# See if there is enough room on the existing line
if first:
_parts[-1] += qp
first = False
continue
if len(_parts[-1] + ', ' + qp) > width:
_parts[-1] += ', '
_parts.append(qp)
continue
_parts[-1] += ', ' + qp
else:
if transform == 'decode' and hval.find('?=') >= 0:
hdata = f'{hname}: ' + LoreMessage.clean_header(hval)
else:
hdata = f'{hname}: {hval}'
if transform != 'encode' or hval.isascii():
if len(hdata) <= width:
return hdata.encode()
# Use simple textwrap, with a small trick that ensures that long non-breakable
# strings don't show up on the next line from the bare header
hdata = hdata.replace(': ', ':_', 1)
wrapped = textwrap.wrap(hdata, break_long_words=False, break_on_hyphens=False,
subsequent_indent=' ', width=width)
return nl.join(wrapped).replace(':_', ': ', 1).encode()
qp = f'{hname}: ' + email.quoprimime.header_encode(hval.encode(), charset='utf-8')
# is it longer than width?
if len(qp) <= width:
return qp.encode()
_parts = list()
while len(qp) > width:
wrapat = width - 2
if len(_parts):
# Also allow for the ' ' at the front on continuation lines
wrapat -= 1
# Make sure we don't break on a =XX escape sequence
while '=' in qp[wrapat-2:wrapat]:
wrapat -= 1
_parts.append(qp[:wrapat] + '?=')
qp = ('=?utf-8?q?' + qp[wrapat:])
_parts.append(qp)
return f'{nl} '.join(_parts).encode()
@staticmethod
def get_msg_as_bytes(msg: email.message.Message, nl: str = '\n',
headers: Literal['encode', 'decode', 'preserve'] = 'preserve') -> bytes:
bdata = b''
for hname, hval in msg.items():
bdata += LoreMessage.wrap_header((hname, str(hval)), nl=nl, transform=headers) + nl.encode()
bdata += nl.encode()
payload = msg.get_payload(decode=True)
for bline in payload.split(b'\n'):
bdata += re.sub(rb'[\r\n]*$', b'', bline) + nl.encode()
return bdata
@staticmethod
def get_parts_from_header(hstr: str) -> dict:
hstr = re.sub(r'\s*', '', hstr)
hdata = dict()
for chunk in hstr.split(';'):
parts = chunk.split('=', 1)
if len(parts) < 2:
continue
hdata[parts[0]] = parts[1]
return hdata
@staticmethod
def get_clean_msgid(msg: email.message.Message, header='Message-Id') -> str:
msgid = None
raw = msg.get(header)
if raw:
matches = re.search(r'<([^>]+)>', LoreMessage.clean_header(raw))
if matches:
msgid = matches.groups()[0]
return msgid
@staticmethod
def get_preferred_duplicate(msg1: email.message.Message, msg2: email.message.Message) -> email.message.Message:
config = get_main_config()
listid1 = LoreMessage.get_clean_msgid(msg1, 'list-id')
if listid1:
prefidx1 = 0
for listglob in config['listid-preference']:
if fnmatch.fnmatch(listid1, listglob):
break
prefidx1 += 1
else:
prefidx1 = config['listid-preference'].index('*')
listid2 = LoreMessage.get_clean_msgid(msg2, 'list-id')
if listid2:
prefidx2 = 0
for listglob in config['listid-preference']:
if fnmatch.fnmatch(listid2, listglob):
break
prefidx2 += 1
else:
prefidx2 = config['listid-preference'].index('*')
if prefidx1 <= prefidx2:
logger.debug('Picked duplicate from preferred source: %s', listid1)
return msg1
logger.debug('Picked duplicate from preferred source: %s', listid2)
return msg2
@staticmethod
def get_patch_id(diff: str) -> Optional[str]:
gitargs = ['patch-id', '--stable']
ecode, out = git_run_command(None, gitargs, stdin=diff.encode())
if ecode > 0 or not len(out.strip()):
return None
return out.split(maxsplit=1)[0]
@staticmethod
def get_patchwork_hash(diff: str) -> str:
"""Generate a hash from a diff. Lifted verbatim from patchwork."""
prefixes = ['-', '+', ' ']
hashed = hashlib.sha1()
for line in diff.split('\n'):
if len(line) <= 0:
continue
hunk_match = HUNK_RE.match(line)
filename_match = FILENAME_RE.match(line)
if filename_match:
# normalise -p1 top-directories
if filename_match.group(1) == '---':
filename = 'a/'
else:
filename = 'b/'
filename += '/'.join(filename_match.group(2).split('/')[1:])
line = filename_match.group(1) + ' ' + filename
elif hunk_match:
# remove line numbers, but leave line counts
def fn(x):
if not x:
return 1
return int(x)
line_nos = list(map(fn, hunk_match.groups()))
line = '@@ -%d +%d @@' % tuple(line_nos)
elif line[0] in prefixes:
# if we have a +, - or context line, leave as-is
pass
else:
# other lines are ignored
continue
hashed.update((line + '\n').encode('utf-8'))
return hashed.hexdigest()
@staticmethod
def get_indexes(diff: str) -> Set[tuple]:
indexes = set()
oldfile = None
newfile = None
for line in diff.split('\n'):
if line.find('diff ') != 0 and line.find('index ') != 0:
continue
matches = re.search(r'^diff\s+--git\s+\w/(.*)\s+\w/(.*)$', line)
if matches:
oldfile = matches.groups()[0]
newfile = matches.groups()[1]
continue
matches = re.search(r'^index\s+([\da-f]+)\.\.[\da-f]+.*$', line)
if matches and oldfile is not None and newfile is not None:
indexes.add((oldfile, matches.groups()[0], newfile))
return indexes
@staticmethod
def find_trailers(body: str, followup: bool = False) -> Tuple[List[LoreTrailer], List[str]]:
ignores = {'phone', 'email'}
headers = {'subject', 'date', 'from'}
links = {'link', 'buglink', 'closes'}
nonperson = links | {'fixes', 'subject', 'date', 'obsoleted-by', 'change-id', 'base-commit'}
# Ignore everything below standard email signature marker
body = body.split('\n-- \n', 1)[0].strip() + '\n'
# Fix some more common copypasta trailer wrapping
# Fixes: abcd0123 (foo bar
# baz quux)
body = re.sub(r'^(\S+:\s+[\da-f]+\s+\([^)]+)\n([^\n]+\))', r'\1 \2', body, flags=re.M)
# Signed-off-by: Long Name
# <email.here@example.com>
body = re.sub(r'^(\S+:\s+[^<]+)\n(<[^>]+>)$', r'\1 \2', body, flags=re.M)
# Signed-off-by: Foo foo <foo@foo.com>
# [for the thing that the thing is too long the thing that is
# thing but thing]
# (too false-positivey, commented out)
# body = re.sub(r'^(\[[^]]+)\n([^]]+]$)', r'\1 \2', body, flags=re.M)
trailers = list()
others = list()
was_trailer = False
for line in body.split('\n'):
line = line.strip('\r')
matches = re.search(r'^\s*(\w\S+):\s+(\S.*)', line, flags=re.I)
if matches:
oname, ovalue = list(matches.groups())
# We only accept headers if we haven't seen any non-trailer lines
lname = oname.lower()
if lname in ignores:
logger.debug('Ignoring known non-trailer: %s', line)
continue
if len(others) and lname in headers:
logger.debug('Ignoring %s (header after other content)', line)
continue
if followup:
if not lname.isascii():
logger.debug('Ignoring known non-ascii follow-up trailer: %s', lname)
continue
mperson = re.search(r'\S+@\S+\.\S+', ovalue)
if not mperson and lname not in nonperson:
logger.debug('Ignoring %s (not a recognized non-person trailer)', line)
continue
mlink = re.search(r'https?://', ovalue)
if mlink and lname not in links:
logger.debug('Ignoring %s (not a recognized link trailer)', line)
continue
extinfo = None
mextinfo = re.search(r'(.*\S+)(\s+#[^#]+)$', ovalue)
if mextinfo:
logger.debug('Trailer contains hashtag extinfo: %s', line)
# Found extinfo of the hashtag genre
egr = mextinfo.groups()
ovalue = egr[0]
extinfo = egr[1]
was_trailer = True
ltrailer = LoreTrailer(name=oname, value=ovalue, extinfo=extinfo)
trailers.append(ltrailer)
continue
# Is it an extended info line, e.g.:
# Signed-off-by: Foo Foo <foo@foo.com>
# [for the foo bits]
if len(line) > 2 and was_trailer and re.search(r'^\s*\[[^]]+]\s*$', line):
trailers[-1].extinfo = line
was_trailer = False
continue
was_trailer = False
others.append(line)
return trailers, others
@staticmethod
def rebuild_message(headers: List[LoreTrailer], message: str, trailers: List[LoreTrailer],
basement: str, signature: str) -> str:
body = ''
if headers:
for ltr in headers:
# There is no [extdata] in git headers, so we omit it
body += ltr.as_string(omit_extinfo=True) + '\n'
body += '\n'
if len(message):
body += message.rstrip('\r\n') + '\n'
if len(trailers):
body += '\n'
for ltr in trailers:
body += ltr.as_string() + '\n'
if len(basement):
if not len(trailers):
body += '\n'
body += '---\n'
body += basement.rstrip('\r\n') + '\n'
if len(signature):
body += '-- \n'
body += signature.rstrip('\r\n') + '\n'
return body
@staticmethod
def get_body_parts(body: str) -> Tuple[List[LoreTrailer], str, List[LoreTrailer], str, str]:
# remove any starting/trailing blank lines
body = body.replace('\r', '')
body = body.strip('\n')
# Extra git-relevant headers, like From:, Subject:, Date:, etc
githeaders = list()
# commit message
message = ''
# everything below the ---
basement = ''
# conformant signature --\s\n
signature = ''
sparts = body.rsplit('\n-- \n', 1)
if len(sparts) > 1:
signature = sparts[1]
body = sparts[0].rstrip('\n')
parts = re.split('^---\n', body, maxsplit=1, flags=re.M)
if len(parts) == 2:
basement = parts[1].rstrip('\n')
elif body.find('\ndiff ') >= 0:
parts = body.split('\ndiff ', 1)
if len(parts) == 2:
parts[1] = 'diff ' + parts[1]
basement = parts[1].rstrip('\n')
mbody = parts[0].strip('\n')
# Split into paragraphs
bpara = mbody.split('\n\n')
# Is every line of the first part in a header format?
mparts = list()
h, o = LoreMessage.find_trailers(bpara[0])
if len(o):
# Not everything was a header, so we don't treat it as headers
mparts.append(bpara[0])
else:
githeaders = h
# Any lines of the last part match the header format?
trailers, nlines = LoreMessage.find_trailers(bpara[-1])
if len(bpara) == 1:
if githeaders == trailers:
# This is a message that consists of just trailers?
githeaders = list()
if nlines:
message = '\n'.join(nlines)
return githeaders, message, trailers, basement, signature
# Add all parts between first and last to mparts
if len(bpara) > 2:
mparts += bpara[1:-1]
if len(nlines):
# Add them as the last part
mparts.append('\n'.join(nlines))
message = '\n\n'.join(mparts)
return githeaders, message, trailers, basement, signature
def fix_trailers(self, extras: Optional[List[LoreTrailer]] = None,
copyccs: bool = False, addmysob: bool = False,
fallback_order: str = '*',
omit_trailers: Optional[List[str]] = None) -> None:
config = get_main_config()
bheaders, message, btrailers, basement, signature = LoreMessage.get_body_parts(self.body)
sobtr = LoreTrailer()
hasmysob = False
if sobtr in btrailers:
# Our own signoff always moves to the bottom of all trailers
hasmysob = True
btrailers.remove(sobtr)
new_trailers = self.followup_trailers
if extras:
new_trailers += extras
if sobtr in new_trailers:
# Our own signoff always moves to the bottom of all trailers
new_trailers.remove(sobtr)
addmysob = True
if copyccs:
alldests = email.utils.getaddresses([str(x) for x in self.msg.get_all('to', [])])
alldests += email.utils.getaddresses([str(x) for x in self.msg.get_all('cc', [])])
# Sort by domain name, then local
alldests.sort(key=lambda x: x[1].find('@') > 0 and x[1].split('@')[1] + x[1].split('@')[0] or x[1])
for pair in alldests:
found = False
for fltr in btrailers + new_trailers:
if fltr.email_eq(pair[1]):
# already present
found = True
break
if not found:
if len(pair[0]):
altr = LoreTrailer(name='Cc', value=f'{pair[0]} <{pair[1]}>')
else:
altr = LoreTrailer(name='Cc', value=pair[1])
new_trailers.append(altr)
torder = config.get('trailer-order', fallback_order)
if torder and torder != '*':
# this only applies to trailers within our chain of custody, so walk existing
# body trailers backwards and stop at the outermost Signed-off-by we find (if any)
for bltr in reversed(btrailers):
if bltr.lname == 'signed-off-by':
break
btrailers.remove(bltr)
new_trailers.insert(0, bltr)
ordered_trailers = list()
for glob in [x.strip().lower() for x in torder.split(',')]:
if not len(new_trailers):
break
for ltr in list(new_trailers):
if fnmatch.fnmatch(ltr.lname, glob):
ordered_trailers.append(ltr)
new_trailers.remove(ltr)
if len(new_trailers):
# Tack them to the bottom
ordered_trailers += new_trailers
new_trailers = ordered_trailers
attpolicy = config['attestation-policy']
fixtrailers = btrailers
# load trailers we should ignore
ignore_from = config.get('trailers-ignore-from')
if ignore_from:
ignores = [x[1].lower() for x in email.utils.getaddresses([ignore_from])]
else:
ignores = list()
ignored = set()
for ltr in new_trailers:
if ltr in fixtrailers or ltr in ignored:
continue
if ltr.addr and ltr.addr[1].lower() in ignores:
logger.info(' x %s', ltr.as_string(omit_extinfo=True))
ignored.add(ltr)
continue
fixtrailers.append(ltr)
extra = ''
if ltr.lmsg is not None:
for attestor in ltr.lmsg.attestors:
if attestor.passing:
extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
elif attpolicy in ('hardfail', 'softfail'):
extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
if attpolicy == 'hardfail':
import sys
logger.critical('---')
logger.critical('Exiting due to attestation-policy: hardfail')
sys.exit(1)
logger.info(' + %s%s', ltr.as_string(omit_extinfo=True), extra)
elif extras is not None and ltr in extras:
logger.info(' + %s%s', ltr.as_string(omit_extinfo=True), extra)
if addmysob or hasmysob:
# Tack on our signoff at the bottom
fixtrailers.append(sobtr)
if not hasmysob:
logger.info(' + %s', sobtr.as_string(omit_extinfo=True))
if omit_trailers and fixtrailers:
for ltr in fixtrailers:
if ltr.lname in omit_trailers:
fixtrailers.remove(ltr)
# Build the new commit message in case we're working directly
# on the tree.
self.message = self.subject + '\n\n'
if len(message):
self.message += message.rstrip('\r\n') + '\n'
if len(fixtrailers):
self.message += '\n'
if len(fixtrailers):
for ltr in fixtrailers:
self.message += ltr.as_string() + '\n'
# Split the basement along '---', in case there is extra info in the
# message of the commit (used by devs to keep extra info about the patch)
bparts = re.split(r'^---\n', basement, flags=re.M)
for bpart in list(bparts):
# If it's a diff or diffstat, we don't care to keep it
if DIFF_RE.search(bpart) or DIFFSTAT_RE.search(bpart):
bparts.remove(bpart)
if bparts:
self.message += '---\n' + '---\n'.join(bparts)
self.body = LoreMessage.rebuild_message(bheaders, message, fixtrailers, basement, signature)
def get_am_subject(self, indicate_reroll=True, use_subject=None):
# Return a clean patch subject
parts = ['PATCH']
if self.lsubject.rfc:
parts.append('RFC')
if self.reroll_from_revision:
if indicate_reroll:
if self.reroll_from_revision != self.revision:
parts.append('v%d->v%d' % (self.reroll_from_revision, self.revision))
else:
parts.append(' %s v%d' % (' ' * len(str(self.reroll_from_revision)), self.revision))
else:
parts.append('v%d' % self.revision)
elif not self.revision_inferred:
parts.append('v%d' % self.revision)
if not self.lsubject.counters_inferred:
parts.append('%d/%d' % (self.lsubject.counter, self.lsubject.expected))
if not use_subject:
use_subject = self.lsubject.subject
return '[%s] %s' % (' '.join(parts), use_subject)
def get_am_message(self, add_trailers=True, addmysob=False, extras=None, copyccs=False, allowbadchars=False):
# Look through the body to make sure there aren't any suspicious unicode control flow chars
# First, encode into ascii and compare for a quickie utf8 presence test
if not allowbadchars and self.body.encode('ascii', errors='replace') != self.body.encode():
import unicodedata
logger.debug('Body contains non-ascii characters. Running Unicode Cf char tests.')
for line in self.body.split('\n'):
# Does this line have any unicode?
if line.encode() == line.encode('ascii', errors='replace'):
continue
ucats = {unicodedata.category(ch) for ch in line.rstrip('\r')}
# If we have Cf (control flow characters) but not Lo ("letter other") characters,
# indicating a language other than latin, then there's likely something funky going on
if 'Cf' in ucats and 'Lo' not in ucats:
# find the offending char
at = 0
for c in line.rstrip('\r'):
if unicodedata.category(c) == 'Cf':
logger.critical('---')
logger.critical('WARNING: Message contains suspicious unicode control characters!')
logger.critical(' Subject: %s', self.full_subject)
logger.critical(' Line: %s', line.rstrip('\r'))
logger.critical(' ------%s^', '-'*at)
logger.critical(' Char: %s (%s)', unicodedata.name(c), hex(ord(c)))
logger.critical(' If you are sure about this, rerun with the right flag to allow.')
sys.exit(1)
at += 1
# Remove anything that's cut off by scissors
mi_msg = email.message.EmailMessage()
mi_msg['From'] = self.msg['From']
mi_msg['Date'] = self.msg['Date']
mi_msg['Subject'] = self.msg['Subject']
mi_msg.set_payload(self.body, charset='utf-8')
mi_msg.set_charset('utf-8')
i, m, p = get_mailinfo(mi_msg.as_bytes(policy=emlpolicy), scissors=True)
self.body = m.decode() + p.decode()
if add_trailers:
self.fix_trailers(copyccs=copyccs, addmysob=addmysob, extras=extras)
am_msg = email.message.EmailMessage()
if i.get('Author'):
hfrom = f'{i.get("Author")} <{i.get("Email")}>'
else:
hfrom = i.get('Email')
am_msg.add_header('Subject', self.get_am_subject(indicate_reroll=False, use_subject=i.get('Subject')))
am_msg.add_header('From', hfrom)
am_msg.add_header('Date', i.get('Date'))
am_msg.add_header('Message-Id', f'<{self.msgid}>')
am_msg.set_payload(self.body, charset='utf-8')
return am_msg
class LoreSubject:
def __init__(self, subject):
# Subject-based info
self.full_subject = None
self.subject = None
self.reply = False
self.resend = False
self.patch = False
self.rfc = False
self.revision = 1
self.counter = 1
self.expected = 1
self.revision_inferred = True
self.counters_inferred = True
self.prefixes = list()
subject = re.sub(r'\s+', ' ', LoreMessage.clean_header(subject)).strip()
self.full_subject = subject
# Is it a reply?
if re.search(r'^(Re|Aw|Fwd):', subject, re.I) or re.search(r'^\w{2,3}:\s*\[', subject):
self.reply = True
self.subject = subject
# We don't care to parse the rest
return
# Remove any brackets inside brackets
while True:
oldsubj = subject
subject = re.sub(r'\[([^]]*)\[([^\[\]]*)]', r'[\1\2]', subject)
subject = re.sub(r'\[([^]]*)]([^\[\]]*)]', r'[\1\2]', subject)
if oldsubj == subject:
break
# Find all [foo] in the title
while subject.find('[') == 0:
matches = re.search(r'^\[([^]]*)]', subject)
if not matches:
break
bracketed = matches.groups()[0].strip()
# Fix [PATCHv3] to be properly [PATCH v3]
bracketed = re.sub(r'(patch)(v\d+)', r'\1 \2', bracketed, flags=re.I)
for chunk in bracketed.split():
# Remove any trailing commas or semicolons
chunk = chunk.strip(',;')
if re.search(r'^\d{1,4}/\d{1,4}$', chunk):
counters = chunk.split('/')
self.counter = int(counters[0])
self.expected = int(counters[1])
self.counters_inferred = False
elif re.search(r'^v\d+$', chunk, re.IGNORECASE):
self.revision = int(chunk[1:])
self.revision_inferred = False
elif chunk.lower().find('rfc') == 0:
self.rfc = True
elif chunk.lower().find('resend') == 0:
self.resend = True
elif chunk.lower().find('patch') == 0:
self.patch = True
self.prefixes.append(chunk)
subject = re.sub(r'^\s*\[[^]]*]\s*', '', subject)
self.subject = subject
def get_extra_prefixes(self, exclude: Optional[List[str]] = None) -> List[str]:
ret = list()
for _prf in self.prefixes:
if exclude and _prf in exclude:
continue
if _prf.lower() == 'patch':
continue
elif re.search(r'v\d+', _prf, flags=re.I):
continue
elif re.search(r'\d+/\d+', _prf):
continue
ret.append(_prf)
return ret
def get_rebuilt_subject(self, eprefixes: Optional[List[str]] = None):
_pfx = self.get_extra_prefixes()
if eprefixes:
for _epfx in eprefixes:
if _epfx not in _pfx:
_pfx.append(_epfx)
if self.revision > 1:
_pfx.append(f'v{self.revision}')
if self.expected > 1:
_pfx.append('%s/%s' % (str(self.counter).zfill(len(str(self.expected))), self.expected))
if len(_pfx):
return '[PATCH ' + ' '.join(_pfx) + '] ' + self.subject
else:
return f'[PATCH] {self.subject}'
def get_slug(self, sep='_', with_counter: bool = True):
unsafe = self.subject
if with_counter:
unsafe = '%04d%s%s' % (self.counter, sep, unsafe)
return re.sub(r'\W+', sep, unsafe).strip(sep).lower()
def __repr__(self):
out = list()
out.append(' full_subject: %s' % self.full_subject)
out.append(' subject: %s' % self.subject)
out.append(' reply: %s' % self.reply)
out.append(' resend: %s' % self.resend)
out.append(' patch: %s' % self.patch)
out.append(' rfc: %s' % self.rfc)
out.append(' revision: %s' % self.revision)
out.append(' revision_inferred: %s' % self.revision_inferred)
out.append(' counter: %s' % self.counter)
out.append(' expected: %s' % self.expected)
out.append(' counters_inferred: %s' % self.counters_inferred)
out.append(' prefixes: %s' % ', '.join(self.prefixes))
return '\n'.join(out)
class LoreAttestor:
mode: Optional[str]
level: Optional[str]
identity: Optional[str]
signtime: Optional[any]
keysrc: Optional[str]
keyalgo: Optional[str]
passing: bool
have_key: bool
errors: list
def __init__(self) -> None:
self.mode = None
self.level = None
self.identity = None
self.signtime = None
self.keysrc = None
self.keyalgo = None
self.passing = False
self.have_key = False
self.errors = list()
@property
def checkmark(self) -> str:
config = get_main_config()
if config['attestation-checkmarks'] == 'fancy':
if self.passing:
return ATT_PASS_FANCY
return ATT_FAIL_FANCY
if self.passing:
return ATT_PASS_SIMPLE
return ATT_FAIL_SIMPLE
@property
def trailer(self):
if self.keyalgo:
mode = self.keyalgo
else:
mode = self.mode
return '%s/%s' % (mode, self.identity.lower())
def check_time_drift(self, emldate, maxdays: int = 30) -> bool:
if not self.passing or self.signtime is None:
return False
maxdrift = datetime.timedelta(days=maxdays)
sdrift = self.signtime - emldate
if sdrift > maxdrift:
self.errors.append('Time drift between Date and t too great (%s)' % sdrift)
return False
logger.debug('PASS : time drift between Date and t (%s)', sdrift)
return True
def check_identity(self, emlfrom: str) -> bool:
if not self.passing or not emlfrom:
return False
if self.level == 'domain':
if emlfrom.lower().endswith('@' + self.identity.lower()):
logger.debug('PASS : sig domain %s matches from identity %s', self.identity, emlfrom)
return True
self.errors.append('signing domain %s does not match From: %s' % (self.identity, emlfrom))
return False
if emlfrom.lower() == self.identity.lower():
logger.debug('PASS : sig identity %s matches from identity %s', self.identity, emlfrom)
return True
self.errors.append('signing identity %s does not match From: %s' % (self.identity, emlfrom))
return False
@staticmethod
def parse_ts(ts: Optional[str]):
try:
return datetime.datetime.utcfromtimestamp(int(ts)).replace(tzinfo=datetime.timezone.utc)
except: # noqa
logger.debug('Failed parsing t=%s', ts)
return None
def __repr__(self):
out = list()
out.append(' mode: %s' % self.mode)
out.append(' level: %s' % self.level)
out.append('identity: %s' % self.identity)
out.append('signtime: %s' % self.signtime)
out.append(' keysrc: %s' % self.keysrc)
out.append(' keyalgo: %s' % self.keyalgo)
out.append(' passing: %s' % self.passing)
out.append('have_key: %s' % self.have_key)
out.append(' errors: %s' % ','.join(self.errors))
return '\n'.join(out)
class LoreAttestorDKIM(LoreAttestor):
def __init__(self, passing: bool, identity: str, signtime: Optional[any], errors: list) -> None:
super().__init__()
self.mode = 'DKIM'
self.level = 'domain'
self.keysrc = 'DNS'
self.signtime = signtime
self.passing = passing
self.errors = errors
if identity.find('@') >= 0:
self.identity = identity.split('@')[1]
else:
self.identity = identity
class LoreAttestorPatatt(LoreAttestor):
def __init__(self, result: bool, identity: str, signtime: Optional[any], keysrc: str, keyalgo: str,
errors: list) -> None:
super().__init__()
self.mode = 'patatt'
self.level = 'person'
self.identity = identity
self.signtime = signtime
self.keysrc = keysrc
self.keyalgo = keyalgo
self.errors = errors
if result == patatt.RES_VALID:
self.passing = True
self.have_key = True
elif result >= patatt.RES_BADSIG:
self.have_key = True
def _run_command(cmdargs: List[str], stdin: Optional[bytes] = None,
rundir: Optional[str] = None) -> Tuple[int, bytes, bytes]:
if rundir:
logger.debug('Changing dir to %s', rundir)
curdir = os.getcwd()
os.chdir(rundir)
else:
curdir = None
logger.debug('Running %s' % ' '.join(cmdargs))
sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
(output, error) = sp.communicate(input=stdin)
if curdir:
logger.debug('Changing back into %s', curdir)
os.chdir(curdir)
return sp.returncode, output, error
def gpg_run_command(args: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
config = get_main_config()
cmdargs = [config['gpgbin'], '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb']
if config['attestation-gnupghome'] is not None:
cmdargs += ['--homedir', config['attestation-gnupghome']]
cmdargs += args
return _run_command(cmdargs, stdin=stdin)
def git_run_command(gitdir: Optional[str], args: List[str], stdin: Optional[bytes] = None,
logstderr: bool = False, decode: bool = True) -> Tuple[int, Union[str, bytes]]:
cmdargs = ['git', '--no-pager']
if gitdir:
if os.path.exists(os.path.join(gitdir, '.git')):
gitdir = os.path.join(gitdir, '.git')
cmdargs += ['--git-dir', gitdir]
# counteract some potential local settings
if args[0] == 'log':
args.insert(1, '--no-abbrev-commit')
cmdargs += args
ecode, out, err = _run_command(cmdargs, stdin=stdin)
if decode:
out = out.decode(errors='replace')
if logstderr and len(err.strip()):
if decode:
err = err.decode(errors='replace')
logger.debug('Stderr: %s', err)
out += err
return ecode, out
def git_credential_fill(gitdir: Optional[str], protocol: str, host: str, username: str) -> Optional[str]:
stdin = f'protocol={protocol}\nhost={host}\nusername={username}\n'.encode()
ecode, out = git_run_command(gitdir, args=['credential', 'fill'], stdin=stdin)
if ecode == 0:
for line in out.splitlines():
if not line.startswith('password='):
continue
chunks = line.split('=', maxsplit=1)
return chunks[1]
return None
def git_get_command_lines(gitdir: Optional[str], args: list) -> List[str]:
ecode, out = git_run_command(gitdir, args)
lines = list()
if out:
for line in out.split('\n'):
if line == '':
continue
lines.append(line)
return lines
def git_get_repo_status(gitdir: Optional[str] = None, untracked: bool = False) -> List[str]:
args = ['status', '--porcelain=v1']
if not untracked:
args.append('--untracked-files=no')
return git_get_command_lines(gitdir, args)
@contextmanager
def git_temp_worktree(gitdir=None, commitish=None):
"""Context manager that creates a temporary work tree and chdirs into it. The
worktree is deleted when the contex manager is closed. Taken from gj_tools."""
dfn = None
try:
with tempfile.TemporaryDirectory() as dfn:
gitargs = ['worktree', 'add', '--detach', '--no-checkout', dfn]
if commitish:
gitargs.append(commitish)
git_run_command(gitdir, gitargs)
with in_directory(dfn):
yield dfn
finally:
if dfn is not None:
git_run_command(gitdir, ['worktree', 'remove', '--force', dfn])
@contextmanager
def git_temp_clone(gitdir=None):
"""Context manager that creates a temporary shared clone."""
if gitdir is None:
topdir = git_get_toplevel()
if topdir and os.path.isdir(os.path.join(topdir, '.git')):
gitdir = os.path.join(topdir, '.git')
if not gitdir:
logger.critical('Current directory is not a git checkout. Try using -g.')
return None
with tempfile.TemporaryDirectory() as dfn:
gitargs = ['clone', '--mirror', '--shared', gitdir, dfn]
git_run_command(None, gitargs)
yield dfn
@contextmanager
def in_directory(dirname):
"""Context manager that chdirs into a directory and restores the original
directory when closed. Taken from gj_tools."""
cdir = os.getcwd()
try:
os.chdir(dirname)
yield True
finally:
os.chdir(cdir)
def git_set_config(fullpath: Optional[str], param: str, value: str, operation: str = '--replace-all'):
args = ['config', operation, param, value]
ecode, out = git_run_command(fullpath, args)
return ecode
def get_config_from_git(regexp: str, defaults: Optional[dict] = None,
multivals: Optional[list] = None, source: Optional[str] = None) -> dict:
if multivals is None:
multivals = list()
args = ['config']
if source:
args += ['--file', source]
args += ['-z', '--get-regexp', regexp]
ecode, out = git_run_command(None, args)
gitconfig = defaults
if not gitconfig:
gitconfig = dict()
if not out:
return gitconfig
for line in out.split('\x00'):
if not line:
continue
key, value = line.split('\n', 1)
try:
chunks = key.split('.')
cfgkey = chunks[-1].lower()
if cfgkey in multivals:
if cfgkey not in gitconfig:
gitconfig[cfgkey] = list()
gitconfig[cfgkey].append(value)
else:
gitconfig[cfgkey] = value
except ValueError:
logger.debug('Ignoring git config entry %s', line)
return gitconfig
def get_main_config() -> dict:
global MAIN_CONFIG
if MAIN_CONFIG is None:
defcfg = copy.deepcopy(DEFAULT_CONFIG)
# some options can be provided via the toplevel .b4-config file,
# so load them up and use as defaults
topdir = git_get_toplevel()
wtglobs = ['send-*', '*mask', '*template*', 'trailer*', 'pw-*']
if topdir:
wtcfg = os.path.join(topdir, '.b4-config')
if os.access(wtcfg, os.R_OK):
logger.debug('Loading worktree configs from %s', wtcfg)
wtconfig = get_config_from_git(r'b4\..*', source=wtcfg)
logger.debug('wtcfg=%s', wtconfig)
for key, val in wtconfig.items():
if val.startswith('./'):
# replace it with full topdir path
val = os.path.abspath(os.path.join(topdir, val))
for wtglob in wtglobs:
if fnmatch.fnmatch(key, wtglob):
logger.debug('wtcfg: %s=%s', key, val)
defcfg[key] = val
break
config = get_config_from_git(r'b4\..*', defaults=defcfg, multivals=['keyringsrc'])
config['listid-preference'] = config['listid-preference'].split(',')
config['listid-preference'].remove('*')
config['listid-preference'].append('*')
if config['gpgbin'] is None:
gpgcfg = get_config_from_git(r'gpg\..*', {'program': 'gpg'})
config['gpgbin'] = gpgcfg['program']
MAIN_CONFIG = config
return MAIN_CONFIG
def get_data_dir(appname: str = 'b4') -> str:
if 'XDG_DATA_HOME' in os.environ:
datahome = os.environ['XDG_DATA_HOME']
else:
datahome = os.path.join(str(pathlib.Path.home()), '.local', 'share')
datadir = os.path.join(datahome, appname)
pathlib.Path(datadir).mkdir(parents=True, exist_ok=True)
return datadir
def get_cache_dir(appname: str = 'b4') -> str:
global _CACHE_CLEANED
if 'XDG_CACHE_HOME' in os.environ:
cachehome = os.environ['XDG_CACHE_HOME']
else:
cachehome = os.path.join(str(pathlib.Path.home()), '.cache')
cachedir = os.path.join(cachehome, appname)
pathlib.Path(cachedir).mkdir(parents=True, exist_ok=True)
if _CACHE_CLEANED:
return cachedir
# Delete all .mbx and .lookup files older than cache-expire
config = get_main_config()
try:
expmin = int(config['cache-expire']) * 60
except ValueError:
logger.critical('ERROR: cache-expire must be an integer (minutes): %s', config['cache-expire'])
expmin = 600
expage = time.time() - expmin
for entry in os.listdir(cachedir):
if entry.find('.mbx') <= 0 and entry.find('.lookup') <= 0 and entry.find('.msgs') <= 0:
continue
fullpath = os.path.join(cachedir, entry)
st = os.stat(fullpath)
if st.st_mtime < expage:
logger.debug('Cleaning up cache: %s', entry)
if os.path.isdir(fullpath):
shutil.rmtree(fullpath)
else:
os.unlink(os.path.join(cachedir, entry))
_CACHE_CLEANED = True
return cachedir
def get_cache_file(identifier: str, suffix: Optional[str] = None):
cachedir = get_cache_dir()
cachefile = hashlib.sha1(identifier.encode()).hexdigest()
if suffix:
cachefile = f'{cachefile}.{suffix}'
return os.path.join(cachedir, cachefile)
def get_cache(identifier: str, suffix: Optional[str] = None) -> Optional[str]:
fullpath = get_cache_file(identifier, suffix=suffix)
try:
with open(fullpath) as fh:
logger.debug('Using cache %s for %s', fullpath, identifier)
return fh.read()
except FileNotFoundError:
logger.debug('Cache miss for %s', identifier)
return None
def clear_cache(identifier: str, suffix: Optional[str] = None) -> None:
fullpath = get_cache_file(identifier, suffix=suffix)
if os.path.exists(fullpath):
os.unlink(fullpath)
logger.debug('Removed cache %s for %s', fullpath, identifier)
def save_cache(contents: str, identifier: str, suffix: Optional[str] = None, mode: str = 'w') -> None:
fullpath = get_cache_file(identifier, suffix=suffix)
try:
with open(fullpath, mode) as fh:
fh.write(contents)
logger.debug('Saved cache %s for %s', fullpath, identifier)
except FileNotFoundError:
logger.debug('Could not write cache %s for %s', fullpath, identifier)
def get_user_config():
global USER_CONFIG
if USER_CONFIG is None:
USER_CONFIG = get_config_from_git(r'user\..*')
if 'name' not in USER_CONFIG:
udata = pwd.getpwuid(os.getuid())
USER_CONFIG['name'] = udata.pw_gecos
if 'email' not in USER_CONFIG:
USER_CONFIG['email'] = os.environ['EMAIL']
return USER_CONFIG
def get_requests_session():
global REQSESSION
if REQSESSION is None:
REQSESSION = requests.session()
REQSESSION.headers.update({'User-Agent': 'b4/%s' % __VERSION__})
return REQSESSION
def get_msgid_from_stdin() -> Optional[str]:
if not sys.stdin.isatty():
from email.parser import BytesParser
message = BytesParser().parsebytes(
sys.stdin.buffer.read(), headersonly=True)
return message.get('Message-ID', None)
return None
def get_msgid(cmdargs: argparse.Namespace) -> Optional[str]:
if not cmdargs.msgid and not cmdargs.no_stdin:
logger.debug('Getting Message-ID from stdin')
msgid = get_msgid_from_stdin()
else:
msgid = cmdargs.msgid
if msgid is None:
return None
msgid = msgid.strip('<>')
# Handle the case when someone pastes a full URL to the message
# Is this a patchwork URL?
matches = re.search(r'^https?://.*/project/.*/patch/([^/]+@[^/]+)', msgid, re.IGNORECASE)
if matches:
logger.debug('Looks like a patchwork URL')
chunks = matches.groups()
msgid = urllib.parse.unquote(chunks[0])
return msgid
# Does it look like a public-inbox URL?
matches = re.search(r'^https?://[^/]+/([^/]+)/([^/]+@[^/]+)', msgid, re.IGNORECASE)
if matches:
chunks = matches.groups()
config = get_main_config()
myloc = urllib.parse.urlparse(config['midmask'])
wantloc = urllib.parse.urlparse(msgid)
if myloc.netloc != wantloc.netloc:
logger.debug('Overriding midmask with passed url parameters')
config['midmask'] = f'{wantloc.scheme}://{wantloc.netloc}/{chunks[0]}/%s'
msgid = urllib.parse.unquote(chunks[1])
# Handle special case when msgid is prepended by id: or rfc822msgid:
if msgid.find('id:') >= 0:
msgid = re.sub(r'^\w*id:', '', msgid)
return msgid
def get_strict_thread(msgs, msgid, noparent=False):
want = {msgid}
ignore = set()
got = set()
seen = set()
maybe = dict()
strict = list()
while True:
for msg in msgs:
c_msgid = LoreMessage.get_clean_msgid(msg)
if c_msgid in ignore:
continue
seen.add(c_msgid)
if c_msgid in got:
continue
logger.debug('Looking at: %s', c_msgid)
refs = set()
msgrefs = list()
if msg.get('In-Reply-To', None):
msgrefs += email.utils.getaddresses([str(x) for x in msg.get_all('in-reply-to', [])])
if msg.get('References', None):
msgrefs += email.utils.getaddresses([str(x) for x in msg.get_all('references', [])])
# If noparent is set, we pretend the message we got passed has no references, and add all
# parent references of this message to ignore
if noparent and msgid == c_msgid:
logger.info('Breaking thread to remove parents of %s', msgid)
ignore = set([x[1] for x in msgrefs])
msgrefs = list()
for ref in set([x[1] for x in msgrefs]):
if ref in ignore:
continue
if ref in got or ref in want:
want.add(c_msgid)
elif len(ref):
refs.add(ref)
if c_msgid not in want:
if ref not in maybe:
maybe[ref] = set()
logger.debug('Going into maybe: %s->%s', ref, c_msgid)
maybe[ref].add(c_msgid)
if c_msgid in want:
strict.append(msg)
got.add(c_msgid)
want.update(refs)
want.discard(c_msgid)
logger.debug('Kept in thread: %s', c_msgid)
if c_msgid in maybe:
# Add all these to want
want.update(maybe[c_msgid])
maybe.pop(c_msgid)
# Add all maybes that have the same ref into want
for ref in refs:
if ref in maybe:
want.update(maybe[ref])
maybe.pop(ref)
# Remove any entries not in "seen" (missing messages)
for c_msgid in set(want):
if c_msgid not in seen or c_msgid in got:
want.remove(c_msgid)
if not len(want):
break
if not len(strict):
return None
if len(msgs) > len(strict):
logger.debug('Reduced thread to requested matches only (%s->%s)', len(msgs), len(strict))
return strict
def mailsplit_bytes(bmbox: bytes, outdir: str, pipesep: Optional[str] = None) -> List[email.message.Message]:
msgs = list()
if pipesep:
logger.debug('Mailsplitting using pipesep=%s', pipesep)
if '\\' in pipesep:
import codecs
pipesep = codecs.decode(pipesep.encode(), 'unicode_escape')
for chunk in bmbox.split(pipesep.encode()):
if chunk.strip():
msgs.append(email.message_from_bytes(chunk, policy=emlpolicy))
return msgs
logger.debug('Mailsplitting the mbox into %s', outdir)
args = ['mailsplit', '--mboxrd', '-o%s' % outdir]
ecode, out = git_run_command(None, args, stdin=bmbox)
if ecode > 0:
logger.critical('Unable to parse mbox received from the server')
return msgs
# Read in the files
for msg in os.listdir(outdir):
with open(os.path.join(outdir, msg), 'rb') as fh:
msgs.append(email.message_from_binary_file(fh, policy=emlpolicy))
return msgs
def get_pi_search_results(query: str, nocache: bool = False) -> Optional[List[email.message.Message]]:
config = get_main_config()
searchmask = config.get('searchmask')
if not searchmask:
logger.critical('b4.searchmask is not defined')
return None
msgs = list()
query = urllib.parse.quote_plus(query)
query_url = searchmask % query
cachedir = get_cache_file(query_url, 'pi.msgs')
if os.path.exists(cachedir) and not nocache:
logger.debug('Using cached copy: %s', cachedir)
for msg in os.listdir(cachedir):
with open(os.path.join(cachedir, msg), 'rb') as fh:
msgs.append(email.message_from_binary_file(fh, policy=emlpolicy))
return msgs
loc = urllib.parse.urlparse(query_url)
logger.info('Grabbing search results from %s', loc.netloc)
session = get_requests_session()
# For the query to retrieve a mbox file, we need to send a POST request
resp = session.post(query_url, data='')
if resp.status_code == 404:
logger.info('Nothing matching that query.')
return None
if resp.status_code != 200:
logger.info('Server returned an error: %s', resp.status_code)
return None
t_mbox = gzip.decompress(resp.content)
resp.close()
if not len(t_mbox):
logger.critical('No messages found for that query')
return None
return split_and_dedupe_pi_results(t_mbox, cachedir=cachedir)
def split_and_dedupe_pi_results(t_mbox: bytes, cachedir: Optional[str] = None) -> List[email.message.Message]:
# Convert into individual files using git-mailsplit
with tempfile.TemporaryDirectory(suffix='-mailsplit') as tfd:
msgs = mailsplit_bytes(t_mbox, tfd)
deduped = dict()
for msg in msgs:
msgid = LoreMessage.get_clean_msgid(msg)
if msgid in deduped:
deduped[msgid] = LoreMessage.get_preferred_duplicate(deduped[msgid], msg)
continue
deduped[msgid] = msg
msgs = list(deduped.values())
if cachedir:
if os.path.exists(cachedir):
shutil.rmtree(cachedir)
pathlib.Path(cachedir).mkdir(parents=True, exist_ok=True)
for at, msg in enumerate(msgs):
with open(os.path.join(cachedir, '%04d' % at), 'wb') as fh:
fh.write(msg.as_bytes(policy=emlpolicy))
return msgs
def get_pi_thread_by_url(t_mbx_url: str, nocache: bool = False):
msgs = list()
cachedir = get_cache_file(t_mbx_url, 'pi.msgs')
if os.path.exists(cachedir) and not nocache:
logger.debug('Using cached copy: %s', cachedir)
for msg in os.listdir(cachedir):
with open(os.path.join(cachedir, msg), 'rb') as fh:
msgs.append(email.message_from_binary_file(fh, policy=emlpolicy))
return msgs
logger.critical('Grabbing thread from %s', t_mbx_url.split('://')[1])
session = get_requests_session()
resp = session.get(t_mbx_url)
if resp.status_code == 404:
logger.critical('That message-id is not known.')
return None
if resp.status_code != 200:
logger.critical('Server returned an error: %s', resp.status_code)
return None
t_mbox = gzip.decompress(resp.content)
resp.close()
if not len(t_mbox):
logger.critical('No messages found for that query')
return None
return split_and_dedupe_pi_results(t_mbox, cachedir=cachedir)
def get_pi_thread_by_msgid(msgid: str, nocache: bool = False,
onlymsgids: Optional[set] = None) -> Optional[list]:
qmsgid = urllib.parse.quote_plus(msgid)
config = get_main_config()
loc = urllib.parse.urlparse(config['midmask'])
# The public-inbox instance may provide a unified index at /all/.
# In fact, /all/ naming is arbitrary, but for now we are going to
# hardcode it to lore.kernel.org settings and maybe make it configurable
# in the future, if necessary.
if loc.path.startswith('/all/'):
projurl = '%s://%s/all' % (loc.scheme, loc.netloc)
else:
# Grab the head from lore, to see where we are redirected
midmask = config['midmask'] % qmsgid
logger.info('Looking up %s', midmask)
session = get_requests_session()
resp = session.head(midmask)
if resp.status_code < 300 or resp.status_code > 400:
logger.critical('That message-id is not known.')
return None
# Pop msgid from the end of the redirect
chunks = resp.headers['Location'].rstrip('/').split('/')
projurl = '/'.join(chunks[:-1])
resp.close()
t_mbx_url = '%s/%s/t.mbox.gz' % (projurl, qmsgid)
logger.debug('t_mbx_url=%s', t_mbx_url)
msgs = get_pi_thread_by_url(t_mbx_url, nocache=nocache)
if not msgs:
return None
if onlymsgids:
strict = list()
for msg in msgs:
if LoreMessage.get_clean_msgid(msg) in onlymsgids:
strict.append(msg)
# also grab any messages where this msgid is in the references header
for onlymsgid in onlymsgids:
if msg.get('references', '').find(onlymsgid) >= 0:
strict.append(msg)
else:
strict = get_strict_thread(msgs, msgid)
return strict
def git_range_to_patches(gitdir: Optional[str], start: str, end: str,
prefixes: Optional[List[str]] = None,
revision: Optional[int] = 1,
msgid_tpt: Optional[str] = None,
seriests: Optional[int] = None,
mailfrom: Optional[Tuple[str, str]] = None,
extrahdrs: Optional[List[Tuple[str, str]]] = None,
ignore_commits: Optional[Set[str]] = None) -> List[Tuple[str, email.message.Message]]:
commits = git_get_command_lines(gitdir, ['rev-list', '--reverse', f'{start}..{end}'])
if not commits:
raise RuntimeError(f'Could not run rev-list {start}..{end}')
if ignore_commits is None:
ignore_commits = set()
# Go through them once to drop ignored commits and get bodies
patches = list()
for commit in commits:
if commit in ignore_commits:
logger.debug('Ignoring commit %s', commit)
continue
ecode, out = git_run_command(gitdir, ['show', '--format=email', '--patch-with-stat', '--encoding=utf-8',
commit], decode=False)
if ecode > 0:
raise RuntimeError(f'Could not get a patch out of {commit}')
msg = email.message_from_bytes(out, policy=emlpolicy)
patches.append((commit, msg))
fullcount = len(patches)
if fullcount == 0:
raise RuntimeError(f'Could not run rev-list {start}..{end}')
vlines = git_get_command_lines(None, ['--version'])
if len(vlines) == 1:
gitver = vlines[0].split()[-1]
else:
gitver = None
expected = len(patches)
for counter, (commit, msg) in enumerate(patches):
msg.set_charset('utf-8')
# Clean From to remove any 7bit-safe encoding
origfrom = LoreMessage.clean_header(msg.get('From'))
lsubject = LoreSubject(msg.get('Subject'))
lsubject.counter = counter + 1
lsubject.expected = expected
lsubject.revision = revision
subject = lsubject.get_rebuilt_subject(eprefixes=prefixes)
logger.debug(' %s', subject)
msg.replace_header('Subject', subject)
inbodyhdrs = list()
setfrom = origfrom
if mailfrom:
# Move the original From and Date into the body
origpair = email.utils.parseaddr(origfrom)
if origpair[1] != mailfrom[1]:
setfrom = format_addrs([mailfrom])
inbodyhdrs.append(f'From: {origfrom}')
msg.replace_header('From', setfrom)
if seriests:
patchts = seriests + counter + 1
origdate = msg.get('Date')
if origdate:
msg.replace_header('Date', email.utils.formatdate(patchts, localtime=True))
else:
msg.add_header('Date', email.utils.formatdate(patchts, localtime=True))
payload = msg.get_payload(decode=True)
if isinstance(payload, bytes):
payload = payload.decode()
if inbodyhdrs:
payload = '\n'.join(inbodyhdrs) + '\n\n' + payload
if gitver and not payload.find('\n-- \n') > 0:
payload += f'\n-- \n{gitver}\n'
msg.set_payload(payload, charset='utf-8')
if extrahdrs is None:
extrahdrs = list()
for hdrname, hdrval in extrahdrs:
try:
msg.replace_header(hdrname, hdrval)
except KeyError:
msg.add_header(hdrname, hdrval)
if msgid_tpt:
msg.add_header('Message-Id', msgid_tpt % str(lsubject.counter))
return patches
def git_commit_exists(gitdir: Optional[str], commit_id: str) -> bool:
gitargs = ['cat-file', '-e', commit_id]
ecode, out = git_run_command(gitdir, gitargs)
return ecode == 0
def git_branch_exists(gitdir: Optional[str], branch_name: str):
gitargs = ['rev-parse', branch_name]
ecode, out = git_run_command(gitdir, gitargs)
return ecode == 0
def git_revparse_tag(gitdir: Optional[str], tagname: str) -> Optional[str]:
if not tagname.startswith('refs/tags/'):
fulltag = f'refs/tags/{tagname}'
else:
fulltag = tagname
gitargs = ['rev-parse', fulltag]
ecode, out = git_run_command(gitdir, gitargs)
if ecode > 0:
return None
return out.strip()
def git_branch_contains(gitdir: Optional[str], commit_id: str, checkall: bool = False) -> List[str]:
gitargs = ['branch', '--format=%(refname:short)', '--contains', commit_id]
if checkall:
gitargs.append('--all')
lines = git_get_command_lines(gitdir, gitargs)
return lines
def git_get_toplevel(path: Optional[str] = None) -> Optional[str]:
topdir = None
# Are we in a git tree and if so, what is our toplevel?
gitargs = ['rev-parse', '--show-toplevel']
lines = git_get_command_lines(path, gitargs)
if len(lines) == 1:
topdir = lines[0]
return topdir
def format_addrs(pairs, clean=True):
addrs = list()
for pair in pairs:
if pair[0] == pair[1]:
addrs.append(pair[1])
continue
if clean:
# Remove any quoted-printable header junk from the name
pair = (LoreMessage.clean_header(pair[0]), pair[1])
# Work around https://github.com/python/cpython/issues/100900
if not pair[0].startswith('=?') and not pair[0].startswith('"') and qspecials.search(pair[0]):
quoted = email.utils.quote(pair[0])
addrs.append(f'"{quoted}" <{pair[1]}>')
continue
addrs.append(email.utils.formataddr(pair))
return ', '.join(addrs)
def make_quote(body, maxlines=5):
headers, message, trailers, basement, signature = LoreMessage.get_body_parts(body)
if not len(message):
# Sometimes there is no message, just trailers
return '> \n'
# Remove common greetings
message = re.sub(r'^(hi|hello|greetings|dear)\W.*\n+', '', message, flags=re.I)
quotelines = list()
qcount = 0
for line in message.split('\n'):
# Quote the first paragraph only and then [snip] if we quoted more than maxlines
if qcount > maxlines and not len(line.strip()):
quotelines.append('> ')
quotelines.append('> [...]')
break
quotelines.append('> %s' % line.rstrip())
qcount += 1
return '\n'.join(quotelines)
def parse_int_range(intrange, upper=None):
# Remove all whitespace
intrange = re.sub(r'\s', '', intrange)
for n in intrange.split(','):
if n.isdigit():
yield int(n)
elif n.find('<') == 0 and len(n) > 1 and n[1:].isdigit():
yield from range(1, int(n[1:]))
elif n.find('-') > 0:
nr = n.split('-')
if nr[0].isdigit() and nr[1].isdigit():
yield from range(int(nr[0]), int(nr[1])+1)
elif not len(nr[1]) and nr[0].isdigit() and upper:
yield from range(int(nr[0]), upper+1)
else:
logger.critical('Unknown range value specified: %s', n)
def check_gpg_status(status: str) -> Tuple[bool, bool, bool, Optional[str], Optional[str]]:
good = False
valid = False
trusted = False
keyid = None
signtime = None
# Do we have a BADSIG?
bs_matches = re.search(r'^\[GNUPG:] BADSIG ([\dA-F]+)\s+(.*)$', status, flags=re.M)
if bs_matches:
keyid = bs_matches.groups()[0]
return good, valid, trusted, keyid, signtime
gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([\dA-F]+)\s+(.*)$', status, flags=re.M)
if gs_matches:
good = True
keyid = gs_matches.groups()[0]
vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([\dA-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)
if vs_matches:
valid = True
signtime = vs_matches.groups()[2]
ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M)
if ts_matches:
trusted = True
return good, valid, trusted, keyid, signtime
def get_gpg_uids(keyid: str) -> list:
gpgargs = ['--with-colons', '--list-keys', keyid]
ecode, out, err = gpg_run_command(gpgargs)
if ecode > 0:
raise KeyError('Unable to get UIDs list matching key %s' % keyid)
keyinfo = out.decode()
uids = list()
for line in keyinfo.split('\n'):
if line[:4] != 'uid:':
continue
chunks = line.split(':')
if chunks[1] in ('r',):
# Revoked UID, ignore
continue
uids.append(chunks[9])
return uids
def save_git_am_mbox(msgs: List[email.message.Message], dest: BinaryIO):
# Git-am has its own understanding of what "mbox" format is that differs from Python's
# mboxo implementation. Specifically, it never escapes the ">From " lines found in bodies
# unless invoked with --patch-format=mboxrd (this is wrong, because ">From " escapes are also
# required in the original mbox "mboxo" format).
# So, save in the format that git-am expects
for msg in msgs:
dest.write(b'From git@z Thu Jan 1 00:00:00 1970\n')
dest.write(LoreMessage.get_msg_as_bytes(msg, headers='decode'))
def save_mboxrd_mbox(msgs: List[email.message.Message], dest: BinaryIO, mangle_from: bool = False):
gen = email.generator.BytesGenerator(dest, mangle_from_=mangle_from, policy=emlpolicy)
for msg in msgs:
dest.write(b'From mboxrd@z Thu Jan 1 00:00:00 1970\n')
gen.flatten(msg)
def save_maildir(msgs: list, dest):
d_new = os.path.join(dest, 'new')
pathlib.Path(d_new).mkdir(parents=True)
d_cur = os.path.join(dest, 'cur')
pathlib.Path(d_cur).mkdir(parents=True)
d_tmp = os.path.join(dest, 'tmp')
pathlib.Path(d_tmp).mkdir(parents=True)
for msg in msgs:
# make a slug out of it
lsubj = LoreSubject(msg.get('subject', ''))
slug = '%04d_%s' % (lsubj.counter, re.sub(r'\W+', '_', lsubj.subject).strip('_').lower())
with open(os.path.join(d_tmp, f'{slug}.eml'), 'wb') as mfh:
mfh.write(LoreMessage.get_msg_as_bytes(msg, headers='decode'))
os.rename(os.path.join(d_tmp, f'{slug}.eml'), os.path.join(d_new, f'{slug}.eml'))
def get_mailinfo(bmsg: bytes, scissors: bool = False) -> Tuple[dict, bytes, bytes]:
with tempfile.TemporaryDirectory() as tfd:
m_out = os.path.join(tfd, 'm')
p_out = os.path.join(tfd, 'p')
if scissors:
cmdargs = ['mailinfo', '--encoding=UTF-8', '--scissors', m_out, p_out]
else:
cmdargs = ['mailinfo', '--encoding=UTF-8', '--no-scissors', m_out, p_out]
ecode, info = git_run_command(None, cmdargs, bmsg)
if not len(info.strip()):
raise ValueError('Could not get mailinfo')
i = dict()
m = b''
p = b''
for line in info.split('\n'):
line = line.strip()
if not line:
continue
chunks = line.split(':', 1)
i[chunks[0]] = chunks[1].strip()
with open(m_out, 'rb') as mfh:
m = mfh.read()
with open(p_out, 'rb') as pfh:
p = pfh.read()
return i, m, p
def read_template(tptfile):
# bubbles up FileNotFound
tpt = ''
if tptfile.find('~') >= 0:
tptfile = os.path.expanduser(tptfile)
if tptfile.find('$') >= 0:
tptfile = os.path.expandvars(tptfile)
with open(tptfile, 'r', encoding='utf-8') as fh:
for line in fh:
if len(line) and line[0] == '#':
continue
tpt += line
return tpt
def get_sendemail_config() -> dict:
global SENDEMAIL_CONFIG
if SENDEMAIL_CONFIG is None:
# Get the default settings first
config = get_main_config()
identity = config.get('sendemail-identity')
_basecfg = get_config_from_git(r'sendemail\.[^.]+$')
if identity:
# Use this identity to override what we got from the default one
sconfig = get_config_from_git(rf'sendemail\.{identity}\..*', defaults=_basecfg)
sectname = f'sendemail.{identity}'
if not len(sconfig):
raise smtplib.SMTPException('Unable to find %s settings in any applicable git config' % sectname)
else:
sconfig = _basecfg
sectname = 'sendemail'
logger.debug('Using values from %s', sectname)
SENDEMAIL_CONFIG = sconfig
return SENDEMAIL_CONFIG
def get_smtp(dryrun: bool = False) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL, list, None], str]:
sconfig = get_sendemail_config()
# Limited support for smtp settings to begin with, but should cover the vast majority of cases
fromaddr = sconfig.get('from')
if not fromaddr:
# We fall back to user.email
usercfg = get_user_config()
fromaddr = usercfg['email']
server = sconfig.get('smtpserver', 'localhost')
port = sconfig.get('smtpserverport', 0)
try:
port = int(port)
except ValueError:
raise smtplib.SMTPException('Invalid smtpport entry in config')
# If server contains slashes, then it's a local command
if '/' in server:
server = os.path.expanduser(os.path.expandvars(server))
sp = shlex.shlex(server, posix=True)
sp.whitespace_split = True
smtp = list(sp)
if '-i' not in smtp:
smtp.append('-i')
# Do we have the envelopesender defined?
env_sender = sconfig.get('envelopesender', '')
if env_sender:
envpair = email.utils.parseaddr(env_sender)
else:
envpair = email.utils.parseaddr(fromaddr)
if envpair[1]:
smtp += ['-f', envpair[1]]
return smtp, fromaddr
encryption = sconfig.get('smtpencryption')
if dryrun:
return None, fromaddr
logger.info('Connecting to %s:%s', server, port)
# We only authenticate if we have encryption
if encryption:
if encryption in ('tls', 'starttls'):
# We do startssl
smtp = smtplib.SMTP(server, port)
# Introduce ourselves
smtp.ehlo()
# Start encryption
smtp.starttls()
# Introduce ourselves again to get new criteria
smtp.ehlo()
elif encryption in ('ssl', 'smtps'):
# We do TLS from the get-go
smtp = smtplib.SMTP_SSL(server, port)
else:
raise smtplib.SMTPException('Unclear what to do with smtpencryption=%s' % encryption)
# If we got to this point, we should do authentication.
auser = sconfig.get('smtpuser')
apass = sconfig.get('smtppass')
if auser and not apass:
# Try with git-credential-helper
if port:
gchost = f'{server}:{port}'
else:
gchost = server
apass = git_credential_fill(None, protocol='smtp', host=gchost, username=auser)
if not apass:
raise smtplib.SMTPException('No password specified for connecting to %s', server)
if auser and apass:
# Let any exceptions bubble up
smtp.login(auser, apass)
else:
# We assume you know what you're doing if you don't need encryption
smtp = smtplib.SMTP(server, port)
return smtp, fromaddr
def get_patchwork_session(pwkey: str, pwurl: str) -> Tuple[requests.Session, str]:
session = requests.session()
session.headers.update({
'User-Agent': 'b4/%s' % __VERSION__,
'Authorization': f'Token {pwkey}',
})
url = '/'.join((pwurl.rstrip('/'), 'api', PW_REST_API_VERSION))
logger.debug('pw url=%s', url)
return session, url
def patchwork_set_state(msgids: List[str], state: str) -> bool:
# Do we have a pw-key defined in config?
config = get_main_config()
pwkey = config.get('pw-key')
pwurl = config.get('pw-url')
pwproj = config.get('pw-project')
if not (pwkey and pwurl and pwproj):
logger.debug('Patchwork support requires pw-key, pw-url and pw-project settings')
return False
pses, url = get_patchwork_session(pwkey, pwurl)
patches_url = '/'.join((url, 'patches'))
tochange = list()
seen = set()
for msgid in msgids:
if msgid in seen:
continue
# Two calls, first to look up the patch-id, second to update its state
params = [
('project', pwproj),
('archived', 'false'),
('msgid', msgid),
]
try:
logger.debug('looking up patch_id of msgid=%s', msgid)
rsp = pses.get(patches_url, params=params, stream=False)
rsp.raise_for_status()
pdata = rsp.json()
for entry in pdata:
patch_id = entry.get('id')
if patch_id:
title = entry.get('name')
if entry.get('state') != state:
seen.add(msgid)
tochange.append((patch_id, title))
except requests.exceptions.RequestException as ex:
logger.debug('Patchwork REST error: %s', ex)
if tochange:
logger.info('---')
loc = urllib.parse.urlparse(pwurl)
logger.info('Patchwork: setting state on %s/%s', loc.netloc, pwproj)
for patch_id, title in tochange:
patchid_url = '/'.join((patches_url, str(patch_id), ''))
logger.debug('patchid_url=%s', patchid_url)
data = [
('state', state),
]
try:
rsp = pses.patch(patchid_url, data=data, stream=False)
rsp.raise_for_status()
newdata = rsp.json()
if newdata.get('state') == state:
logger.info(' -> %s : %s', state, title)
except requests.exceptions.RequestException as ex:
logger.debug('Patchwork REST error: %s', ex)
def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, None], msgs: Sequence[email.message.Message],
fromaddr: Optional[str], destaddrs: Optional[Union[set, list]] = None,
patatt_sign: bool = False, dryrun: bool = False,
output_dir: Optional[str] = None, web_endpoint: Optional[str] = None,
reflect: bool = False) -> Optional[int]:
tosend = list()
if output_dir is not None:
dryrun = True
for msg in msgs:
if not msg.get('X-Mailer'):
msg.add_header('X-Mailer', f'b4 {__VERSION__}')
msg.set_charset('utf-8')
if dryrun or web_endpoint:
nl = '\n'
else:
nl = '\r\n'
bdata = LoreMessage.get_msg_as_bytes(msg, nl=nl, headers='encode')
subject = msg.get('Subject', '')
ls = LoreSubject(subject)
if patatt_sign:
import patatt
# patatt.logger = logger
try:
bdata = patatt.rfc2822_sign(bdata)
except patatt.NoKeyError as ex:
logger.critical('CRITICAL: Error signing: no key configured')
logger.critical(' Run "patatt genkey" or configure "user.signingKey" to use PGP')
logger.critical(' As a last resort, rerun with --no-sign')
raise RuntimeError(str(ex))
except patatt.SigningError as ex:
raise RuntimeError('Failure trying to patatt-sign: %s' % str(ex))
if dryrun:
if output_dir:
filen = '%s.eml' % ls.get_slug(sep='-')
logger.info(' %s', filen)
write_to = os.path.join(output_dir, filen)
with open(write_to, 'wb') as fh:
fh.write(bdata)
continue
logger.info(' --- DRYRUN: message follows ---')
logger.info(' | ' + bdata.decode().rstrip().replace('\n', '\n | '))
logger.info(' --- DRYRUN: message ends ---')
continue
if not destaddrs:
alldests = email.utils.getaddresses([str(x) for x in msg.get_all('to', [])])
alldests += email.utils.getaddresses([str(x) for x in msg.get_all('cc', [])])
myaddrs = {x[1] for x in alldests}
else:
myaddrs = destaddrs
tosend.append((myaddrs, bdata, ls))
if not len(tosend):
return 0
logger.info('---')
if web_endpoint:
if reflect:
logger.info('Reflecting via web endpoint %s', web_endpoint)
wpaction = 'reflect'
else:
logger.info('Sending via web endpoint %s', web_endpoint)
wpaction = 'receive'
req = {
'action': wpaction,
'messages': [x[1].decode() for x in tosend],
}
ses = get_requests_session()
res = ses.post(web_endpoint, json=req)
try:
rdata = res.json()
if rdata.get('result') == 'success':
return len(tosend)
except Exception as ex: # noqa
logger.critical('Odd response from the endpoint: %s', res.text)
return 0
if rdata.get('result') == 'error':
logger.critical('Error from endpoint: %s', rdata.get('message'))
return 0
sent = 0
envpair = email.utils.parseaddr(fromaddr)
if isinstance(smtp, list):
# This is a local command
if reflect:
logger.info('Reflecting via "%s"', ' '.join(smtp))
else:
logger.info('Sending via "%s"', ' '.join(smtp))
for destaddrs, bdata, lsubject in tosend:
logger.info(' %s', lsubject.full_subject)
if reflect:
cmdargs = list(smtp) + [envpair[1]]
else:
cmdargs = list(smtp) + list(destaddrs)
ecode, out, err = _run_command(cmdargs, stdin=bdata)
if ecode > 0:
raise RuntimeError('Error running %s: %s' % (' '.join(smtp), err.decode()))
sent += 1
elif smtp:
for destaddrs, bdata, lsubject in tosend:
# Force compliant eols
bdata = re.sub(rb'\r\n|\n|\r(?!\n)', b'\r\n', bdata)
logger.info(' %s', lsubject.full_subject)
if reflect:
smtp.sendmail(fromaddr, [envpair[1]], bdata)
else:
smtp.sendmail(fromaddr, destaddrs, bdata)
sent += 1
return sent
def git_get_current_branch(gitdir: Optional[str] = None, short: bool = True) -> Optional[str]:
gitargs = ['symbolic-ref', '-q', 'HEAD']
ecode, out = git_run_command(gitdir, gitargs)
if ecode > 0:
logger.critical('Not able to get current branch (git symbolic-ref HEAD)')
return None
mybranch = out.strip()
if short:
return re.sub(r'^refs/heads/', '', mybranch)
return mybranch
def get_excluded_addrs() -> Set[str]:
config = get_main_config()
excludes = set()
c_excludes = config.get('email-exclude')
if c_excludes:
for entry in c_excludes.split(','):
excludes.add(entry.strip())
return excludes
def cleanup_email_addrs(addresses: List[Tuple[str, str]], excludes: Set[str],
gitdir: Optional[str]) -> List[Tuple[str, str]]:
global MAILMAP_INFO
for entry in list(addresses):
# Only qualified addresses, please
if not len(entry[1].strip()) or '@' not in entry[1]:
addresses.remove(entry)
continue
# Check if it's in excludes
removed = False
for exclude in excludes:
if fnmatch.fnmatch(entry[1], exclude):
logger.debug('Removed %s due to matching %s', entry[1], exclude)
addresses.remove(entry)
removed = True
break
if removed:
continue
# Check if it's mailmap-replaced
if entry[1] in MAILMAP_INFO:
if MAILMAP_INFO[entry[1]]:
addresses.remove(entry)
addresses.append(MAILMAP_INFO[entry[1]])
continue
logger.debug('Checking if %s is mailmap-replaced', entry[1])
args = ['check-mailmap', f'<{entry[1]}>']
ecode, out = git_run_command(gitdir, args)
if ecode != 0:
MAILMAP_INFO[entry[1]] = None
continue
replacement = email.utils.getaddresses([out.strip()])
if len(replacement) == 1:
if entry[1] == replacement[0][1]:
MAILMAP_INFO[entry[1]] = None
continue
logger.debug('Replaced %s with mailmap-updated %s', entry[1], replacement[0][1])
MAILMAP_INFO[entry[1]] = replacement[0]
addresses.remove(entry)
addresses.append(replacement[0])
return addresses
def get_email_signature() -> str:
usercfg = get_user_config()
# Do we have a .signature file?
sigfile = os.path.join(str(Path.home()), '.signature')
if os.path.exists(sigfile):
with open(sigfile, 'r', encoding='utf-8') as fh:
signature = fh.read()
else:
signature = '%s <%s>' % (usercfg['name'], usercfg['email'])
return signature
def retrieve_messages(cmdargs: argparse.Namespace) -> Tuple[Optional[str], Optional[list]]:
msgid = None
if not cmdargs.localmbox:
if not can_network:
raise LookupError('Cannot retrieve threads from remote in offline mode')
msgid = get_msgid(cmdargs)
if not msgid:
raise LookupError('Pipe a message or pass msgid as parameter')
pickings = set()
if 'cherrypick' in cmdargs and cmdargs.cherrypick == '_':
# Just that msgid, please
pickings = {msgid}
msgs = get_pi_thread_by_msgid(msgid, nocache=cmdargs.nocache, onlymsgids=pickings)
if not msgs:
return None, msgs
else:
if cmdargs.localmbox == '-':
# The entire mbox is passed via stdin, so mailsplit it and use the first message for our msgid
with tempfile.TemporaryDirectory() as tfd:
msgs = mailsplit_bytes(sys.stdin.buffer.read(), tfd, pipesep=cmdargs.stdin_pipe_sep)
if not len(msgs):
raise LookupError('Stdin did not contain any messages')
elif os.path.exists(cmdargs.localmbox):
msgid = get_msgid(cmdargs)
if os.path.isdir(cmdargs.localmbox):
in_mbx = mailbox.Maildir(cmdargs.localmbox)
else:
in_mbx = mailbox.mbox(cmdargs.localmbox)
if msgid:
msgs = get_strict_thread(in_mbx, msgid)
if not msgs:
raise LookupError('Could not find %s in %s' % (msgid, cmdargs.localmbox))
else:
msgs = in_mbx
else:
raise LookupError('Mailbox %s does not exist' % cmdargs.localmbox)
if msgid and 'noparent' in cmdargs and cmdargs.noparent:
msgs = get_strict_thread(msgs, msgid, noparent=True)
if not msgid and msgs:
for msg in msgs:
msgid = msg.get('Message-ID', None)
if msgid:
msgid = msgid.strip('<>')
break
return msgid, msgs
def git_revparse_obj(gitobj: str) -> str:
ecode, out = git_run_command(None, ['rev-parse', gitobj])
if ecode > 0:
raise RuntimeError('No such object: %s' % gitobj)
return out.strip()