blob: 522b4a4aabcabe825527e55abdd59f679b3b4790 [file] [log] [blame]
#!/usr/bin/env python3
import email
import email.message
import email.utils
import email.parser
import requests
import logging
import base64
import b4
import sys
import os
import uuid
import datetime
import gzip
import urllib.parse
import sqlalchemy as sa
from typing import Dict, Tuple, List, Set, Optional
from fnmatch import fnmatch
from string import Template
from sqlalchemy.exc import NoSuchTableError
# Policy we use for saving mail locally
emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None,
message_factory=email.message.EmailMessage)
# force b4 to use EmailMessage factory
b4.emlpolicy = emlpolicy
REQSESSION = None
CONFIG = dict()
__APPNAME__ = 'bugspray'
__VERSION__ = '0.1-dev'
__DBSCHEMA__ = 1
logger = logging.getLogger(__APPNAME__)
b4.logger = logger
SACONN = None
SAENGINE = None
REST_CACHE = dict()
MAINT_CACHE = dict()
def get_msg_from_stdin() -> Optional[email.message.EmailMessage]:
if not sys.stdin.isatty():
msg = email.parser.BytesParser(policy=emlpolicy).parsebytes(sys.stdin.buffer.read())
return msg # noqa
return None
def get_requests_session():
global REQSESSION
if REQSESSION is None:
REQSESSION = requests.session()
REQSESSION.headers.update({'User-Agent': f'{__APPNAME__}/{__VERSION__}'})
return REQSESSION
def get_config() -> Dict:
return CONFIG
def get_component_config(product: str, component: str) -> Dict:
config = get_config()
try:
return config['components'][product][component]
except KeyError:
return dict()
def get_template_by_bid(tptname: str, bid: int) -> Template:
product, component = bz_get_product_component_by_bid(bid)
return get_template_by_product_component(tptname, product, component)
def get_template_by_product_component(tptname: str, product: str, component: str) -> Template:
config = get_config()
try:
return Template(config['components'][product][component]['templates'][tptname])
except KeyError:
pass
return Template(config['templates'][tptname])
def get_msgid_link(msgid: str) -> str:
config = get_config()
linkmask = config['bugzilla'].get('linkmask', 'https://lore.kernel.org/{msgid}')
return linkmask.format(msgid=urllib.parse.quote_plus(msgid, safe='@'))
def get_pi_search(title: str) -> dict:
config = get_config()
try:
pconf = config['publicinbox']['searches'][title]
except KeyError:
raise LookupError('No such publicinbox search: %s' % title)
if 'inherits' in pconf and title != pconf['inherits']:
mconf = get_pi_search(pconf['inherits'])
mconf.update(pconf)
return mconf
return pconf
def bz_rest(path: str, payload: Dict = None, params: Dict = None, method: str = 'GET') -> Dict:
global REST_CACHE
# We only cache GETs without any params
if method == 'GET' and not params and path in REST_CACHE:
logger.debug('Using cached data')
return REST_CACHE[path]
config = get_config()
url = '{BZURL}/{path}'.format(BZURL=config['bugzilla'].get('resturl').rstrip('/'), path=path)
logger.debug('Querying url=%s', url)
myparams = dict() if params is None else dict(params)
apikey = config['bugzilla'].get('apikey')
if not apikey:
apikey = os.getenv('BZAPIKEY', '')
myparams['api_key'] = apikey
ses = get_requests_session()
if method == 'GET':
res = ses.get(url, params=myparams)
elif method == 'POST':
res = ses.post(url, params=myparams, json=payload)
elif method == 'PUT':
res = ses.put(url, params=myparams, json=payload)
else:
logger.critical('CRITICAL: Unknown method=%s', method)
raise RuntimeError('Unknown method %s' % method)
# logger.debug('res=%s', res.text)
if res.status_code == 404:
raise LookupError('Bugzilla returned 404: %s' % res.text)
# for every other error, we just BT for now
res.raise_for_status()
rdata = res.json()
if method == 'GET' and not params:
# Cache it
REST_CACHE[path] = rdata
elif method != 'GET':
# We changed something, so nuke our cache
logger.debug('Clearing cache')
REST_CACHE = dict()
return rdata
def bz_add_atts_to_bug(bid: int, atts: List[Dict]) -> List[int]:
aids = list()
for att in atts:
att['ids'] = [bid]
rdata = bz_rest(f'bug/{bid}/attachment', att, method='POST')
logger.debug('Created new attachment %s', ', '.join(rdata['ids']))
aids += rdata['ids']
return aids
def bz_set_bug_status_resolution(bid: int, status: str, resolution: str) -> None:
payload = {
'status': status,
'resolution': resolution,
}
logger.debug('Setting bug_id=%s status=%s resolution=%s', bid, status, resolution)
bz_rest(f'bug/{bid}', payload, method='PUT')
def bz_add_new_bug(payload: Dict) -> Tuple[int, int]:
if 'version' not in payload:
payload['version'] = 'unspecified'
rdata = bz_rest('bug', payload, method='POST')
bid = rdata['id']
logger.debug('Created new bug %s', bid)
# Apparently, we don't get comment-id info from new bug creation
cid = bz_get_cid_by_bid_count(bid)
return bid, cid
def bz_add_new_comment(bid: int, comment: str) -> int:
payload = {
'id': bid,
'comment': comment
}
rdata = bz_rest(f'bug/{bid}/comment', payload, method='POST')
cid = rdata['id']
logger.debug('Created new comment %s', cid)
return cid
def bz_get_bug(bid: int, resolve_dupes=False) -> Dict:
if resolve_dupes:
bid = bz_dedupe_bid(bid)
path = f'bug/{bid}'
rdata = bz_rest(path)
for bdata in rdata['bugs']:
if bdata['id'] == bid:
return bdata
raise RuntimeError('Could not get bug info for %s' % bid)
def bz_get_user(username: str) -> Dict:
path = f'user/{username}'
try:
rdata = bz_rest(path)
for udata in rdata['users']:
if udata['name'] == username:
return udata
except LookupError:
pass
raise LookupError('Could not get user info for %s' % username)
def bz_get_user_groups(username: str) -> Set:
udata = bz_get_user(username)
return set([x['name'] for x in udata['groups']])
def bz_get_cid_by_bid_count(bid: int, count: int = 0) -> int:
bdata = bz_rest(f'bug/{bid}/comment')
for rbid, rbdata in bdata['bugs'].items():
if int(rbid) != bid:
continue
for comment in rbdata['comments']:
if comment['count'] == count:
logger.debug('cid for %s/c%s is %s', bid, count, comment['id'])
return comment['id']
raise LookupError('No cid matching bid=%s count=%s' % (bid, count))
def bz_get_count_by_bid_cid(bid: int, cid: int) -> int:
bdata = bz_rest(f'bug/{bid}/comment')
for rbid, rbdata in bdata['bugs'].items():
if int(rbid) != bid:
continue
for comment in rbdata['comments']:
if comment['id'] == cid:
logger.debug('count for %s/%s is c%s', bid, cid, comment['count'])
return comment['count']
raise LookupError('No match for bid=%s cid=%s', bid, cid)
def bz_dedupe_bid(bid: int) -> int:
bdata = bz_get_bug(bid)
if bdata.get('dupe_of'):
# Nothing wrong with recursion
return bz_dedupe_bid(bdata['dupe_of'])
return bid
def bz_check_user_allowed(uid: str, product: str, component: str) -> bool:
cconf = get_component_config(product, component)
mustgroups = cconf.get('pi_must_bz_groups')
if mustgroups is None:
# No restrictions, anyone can do anything they like
return True
try:
udata = bz_get_user(uid)
except LookupError:
logger.debug('Could not find user %s in bugzilla', uid)
return False
for mustgroup in mustgroups:
for ugroup in udata.get('groups', list()):
if ugroup['name'] == mustgroup:
logger.debug('%s mustgroup matches %s', uid, mustgroup)
return True
logger.debug('%s not member of %s', uid, mustgroups)
return False
def bz_assign_bug(bid: int, uid: str) -> None:
logger.info('Assigning bug %s to %s', bid, uid)
path = f'bug/{bid}'
payload = {
'assigned_to': uid,
}
bz_rest(path, payload=payload, method='PUT')
def bz_set_bug_product_component(bid: int, product: str, component: str) -> None:
logger.info('Setting bug %s to product=%s, component=%s', bid, product, component)
path = f'bug/{bid}'
payload = {
'product': product,
'component': component,
}
bz_rest(path, payload=payload, method='PUT')
def bz_set_bug_subsystem(bid: int, subsystem: str) -> None:
config = get_config()
subsystem_cf = config['bugzilla'].get('subsystem_cf')
if not subsystem_cf:
logger.info('No subsystem_cf defined for this bugzilla')
return
logger.info('Setting bug %s subsystem to %s', bid, subsystem)
path = f'bug/{bid}'
payload = {
subsystem_cf: subsystem
}
bz_rest(path, payload=payload, method='PUT')
def db_get_query_last_check(product: str, component: str) -> str:
key = f'query_{product}_{component}'
return db_get_meta_value(key)
def db_store_query_last_check(product: str, component: str, last_check: str) -> None:
key = f'query_{product}_{component}'
return db_store_meta_value(key, last_check)
def db_get_msgid_by_bid_cid(bid: int, cid: Optional[int]) -> str:
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine)
if cid:
q = sa.select([t_bmap.c.message_id]).where(t_bmap.c.bug_id == bid, t_bmap.c.comment_id == cid)
else:
# If cid is not defined, we get all mappings and use the lowest cid
q = sa.select([t_bmap.c.message_id]).where(t_bmap.c.bug_id == bid).order_by(t_bmap.c.comment_id)
rp = dbconn.execute(q)
fa = rp.fetchall()
if len(fa):
logger.debug('query results for bid=%s, cid=%s: %s', bid, cid, fa)
return fa[0][0]
raise LookupError('No message-id matching bid=%s, cid=%s' % (bid, cid))
def db_get_bid_cid_by_msgid(msgid: str) -> Tuple[int, Optional[int]]:
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine)
logger.debug('Querying db for msgid=%s', msgid)
q = sa.select([t_bmap.c.bug_id, t_bmap.c.comment_id]).where(t_bmap.c.message_id == msgid)
rp = dbconn.execute(q)
fa = rp.fetchall()
if not len(fa):
raise LookupError('msgid %s not known' % msgid)
bid, cid = fa[0]
dbid = bz_dedupe_bid(bid)
if dbid != bid:
cid = None
logger.debug(' matching bid=%s, cid=%s', bid, cid)
return bid, cid
def db_store_msgid_bid_cid(msgid: str, bid: int, cid: int) -> None:
msgid = msgid.strip('<>')
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine)
q = sa.insert(t_bmap).values(message_id=msgid, bug_id=bid, comment_id=cid)
dbconn.execute(q)
logger.info('Created new mapping for %s: %s/%s', msgid, bid, cid)
def db_get_recipients(bid: int) -> Set[str]:
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_recip = sa.Table('recipients', md, autoload=True, autoload_with=engine)
logger.debug('Querying recipients for bid=%s', bid)
q = sa.select([t_recip.c.email]).where(t_recip.c.bug_id == bid)
rp = dbconn.execute(q)
fa = rp.fetchall()
if not len(fa):
raise LookupError('bid %s not known' % bid)
return set(x[0] for x in fa)
def db_get_bugs_for_commit(csha: str) -> Set[int]:
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_cmap = sa.Table('commit_bug_mapping', md, autoload=True, autoload_with=engine)
logger.debug('Querying for commit_id=%s', csha)
q = sa.select([t_cmap.c.bug_id]).where(t_cmap.c.commit_id == csha)
rp = dbconn.execute(q)
fa = rp.fetchall()
if not len(fa):
return set()
return set(x[0] for x in fa)
def db_store_bug_for_commit(csha: str, bid: int) -> None:
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_cmap = sa.Table('commit_bug_mapping', md, autoload=True, autoload_with=engine)
logger.debug('Storing bug_id=%s for commit_id=%s', bid, csha)
q = sa.insert(t_cmap).values(bug_id=bid, commit_id=csha)
dbconn.execute(q)
def db_store_recipients(bid: int, recipients: Set[str]) -> None:
# TODO: add ability to unsubscribe?
try:
stored = db_get_recipients(bid)
except LookupError:
stored = set()
# Any new ones to store?
extras = recipients - stored
if not extras:
return
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_recip = sa.Table('recipients', md, autoload=True, autoload_with=engine)
logger.debug('Storing new recipients for bid=%s', bid)
for addr in extras:
q = sa.insert(t_recip).values(bug_id=bid, email=addr)
dbconn.execute(q)
logger.debug(' Added %s', addr)
def db_get_meta_value(key: str) -> str:
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_meta = sa.Table('meta', md, autoload=True, autoload_with=engine)
q = sa.select([t_meta.c.var_value]).where(t_meta.c.var_key == key)
rp = dbconn.execute(q)
fa = rp.fetchall()
if not len(fa):
raise LookupError('meta key %s not known' % key)
return fa[0][0]
def db_store_meta_value(key: str, value: str) -> None:
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_meta = sa.Table('meta', md, autoload=True, autoload_with=engine)
with engine.begin() as dbconn:
q = sa.delete(t_meta).where(t_meta.c.var_key == key)
dbconn.execute(q)
q = sa.insert(t_meta).values(var_key=key, var_value=value)
dbconn.execute(q)
def db_store_notify_last_check(bid: int, when: str):
key = f'notify_bug_{bid}'
return db_store_meta_value(key, when)
def db_get_notify_last_check(bid: int) -> str:
key = f'notify_bug_{bid}'
return db_get_meta_value(key)
def bz_get_changed_bugs(since: str, include_untracked: bool = False) -> List:
logger.debug('Querying for changed bugs since %s', since)
params = {
'chfieldfrom': since,
'include_fields': 'id,summary,product,component',
}
rdata = bz_rest('bug', params=params)
if include_untracked:
return rdata['bugs']
bids = tuple([x['id'] for x in rdata['bugs']])
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine)
q = sa.select([t_bmap.c.bug_id]).where(t_bmap.c.bug_id.in_(bids)).distinct()
rp = dbconn.execute(q)
fa = rp.fetchall()
if not fa:
return list()
tracked = set([x[0] for x in fa])
bugs = list()
for bdata in rdata['bugs']:
if bdata['id'] in tracked:
bugs.append(bdata)
return bugs
def bz_quicksearch_bugs(query: str) -> Dict:
params = {
'include_fields': 'id,summary,product,component',
'quicksearch': query,
}
return bz_rest('bug', params=params)
def bz_get_query_bugs(params: Dict, exclude: Set[int]) -> List[int]:
if 'include_fields' not in params:
params['include_fields'] = 'id,summary,product,component'
rdata = bz_rest('bug', params=params)
bids = list()
for bdata in rdata['bugs']:
if bdata['id'] in exclude:
continue
bids.append(bdata['id'])
return bids
def get_human_size(size: int, decimals: int = 2) -> str:
units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']
while True:
unit = units.pop(0)
if size < 1024.0 or not len(units):
break
size /= 1024.0
return f'{size:.{decimals}f} {unit}'
def bz_get_attachment_by_aid(aid: int, include_fields: Optional[str] = None) -> Dict:
if include_fields is None:
rdata = bz_rest(f'bug/attachment/{aid}')
else:
params = {
'include_fields': include_fields,
}
rdata = bz_rest(f'bug/attachment/{aid}', params=params)
for raid, radata in rdata['attachments'].items():
if int(raid) == aid:
if 'size' in radata:
radata['human_size'] = get_human_size(radata['size'])
return radata
raise LookupError('No matching attachment_id found: %s' % aid)
def bz_get_all_comments_by_bid(bid: int) -> List[Dict]:
bdata = bz_rest(f'bug/{bid}/comment')
for rbid, rbdata in bdata['bugs'].items():
if int(rbid) != bid:
continue
return rbdata['comments']
def bz_get_comments_for_bid_since(bid: int, since: str) -> List[Dict]:
params = {
'new_since': since,
'id': bid,
}
bdata = bz_rest(f'bug/{bid}/comment', params=params)
for rbid, rbdata in bdata['bugs'].items():
if int(rbid) != bid:
continue
return rbdata['comments']
def bz_get_newest_comments_for_bid(bid: int, include_private: bool = False) -> List[Dict]:
try:
when = db_get_notify_last_check(bid)
except LookupError:
# grab the highest cid we know about
engine, dbconn = db_get_sa()
md = sa.MetaData()
t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine)
q = sa.select(sa.func.max(t_bmap.c.comment_id)).where(t_bmap.c.bug_id == bid)
rp = dbconn.execute(q)
mfa = rp.fetchall()
c_max = None
if mfa:
c_max = mfa[0][0]
cdatas = bz_get_all_comments_by_bid(bid)
comments = list()
for cdata in cdatas:
if c_max and cdata['id'] <= c_max:
continue
if not include_private and cdata['is_private']:
continue
comments.append(cdata)
return comments
logger.debug('Getting newest comments since %s', when)
cdatas = bz_get_comments_for_bid_since(bid, when)
comments = list()
for cdata in cdatas:
if not include_private and cdata['is_private']:
continue
comments.append(cdata)
return comments
def msg_get_inre_msgids(msg: email.message.EmailMessage) -> List[str]:
pairs = list()
if msg.get('In-Reply-To'):
pairs += email.utils.getaddresses([str(x) for x in msg.get_all('in-reply-to', [])])
if msg.get('References'):
pairs += email.utils.getaddresses([str(x) for x in msg.get_all('references', [])])
msgids = list()
for pair in pairs:
if pair[1].strip() and pair[1] not in msgids:
msgids.append(pair[1])
logger.debug('msgids=%s', str(msgids))
return msgids
def sort_msgs_by_received(msgs: List[email.message.EmailMessage]) -> List[email.message.EmailMessage]:
tosort = list()
for msg in msgs:
latest_rdt = None
for rhdr in msg.get_all('Received'):
# The received headers are pretty free-form, but generally will end with ; datetimeinfo
chunks = rhdr.rsplit(';', maxsplit=1)
if len(chunks) < 2:
continue
rdate = chunks[1].strip()
rdt = email.utils.parsedate_tz(rdate)
if rdt is None:
continue
# use the latest rdt
if not latest_rdt or latest_rdt < rdt:
latest_rdt = rdt
latest_rdt = None
if not latest_rdt and msg.get('Date'):
# Use the Date header as fallback
latest_rdt = email.utils.parsedate_to_datetime(msg.get('Date'))
if not latest_rdt:
logger.debug('Message without a date: %s!', msg.get('Message-ID'))
continue
tosort.append((latest_rdt, msg))
sortedmsgs = list()
for rdt, msg in sorted(tosort, key=lambda x: x[0]):
sortedmsgs.append(msg)
return sortedmsgs
def msg_get_inre_bid_cid(msg: email.message.EmailMessage) -> Tuple[int, int]:
msgids = msg_get_inre_msgids(msg)
if not msgids:
logger.debug('No references in msg')
raise LookupError('No references found')
res = list()
for msgid in msgids:
logger.debug('Trying to match msgid=%s', msgid)
try:
(bid, cid) = db_get_bid_cid_by_msgid(msgid)
logger.debug('Matched msgid=%s to bid=%s, cid=%s', msgid, bid, cid)
res.append((bid, cid))
except LookupError:
logger.debug('No match for msgid=%s', msgid)
# Try the next one until we run out
pass
if not res:
logger.debug('No reference matched database records')
raise LookupError('Nothing matching in the db')
if len(res) > 1:
# Use the highest cid, which is going to be the latest matching comment
res.sort(key=lambda x: x[1], reverse=True)
return res[0]
def msg_get_recipients(msg: email.message.EmailMessage) -> Set[str]:
pairs = list()
if msg.get('To'):
pairs += email.utils.getaddresses([str(x) for x in msg.get_all('to', [])])
if msg.get('Cc'):
pairs += email.utils.getaddresses([str(x) for x in msg.get_all('cc', [])])
if msg.get('From'):
pairs += email.utils.getaddresses([str(x) for x in msg.get_all('from', [])])
return set([x[1].lower() for x in pairs])
def bz_get_product_component_by_bid(bid: int) -> Tuple[str, str]:
bdata = bz_get_bug(bid, resolve_dupes=True)
return bdata['product'], bdata['component']
def get_product_component_by_recipients(recipients: Set[str]) -> Tuple[str, str]:
config = get_config()
if not config.get('components'):
logger.debug('No components found in config')
raise LookupError('No components defined in config')
for recipient in recipients:
logger.debug('Matching %s', recipient)
for bz_product, bz_components in config['components'].items():
for bz_component, c_config in bz_components.items():
if not c_config.get('recipients'):
continue
for addr in c_config['recipients']:
if fnmatch(recipient, addr):
logger.debug('Matched %s with product=%s, component=%s', recipient, bz_product, bz_component)
return bz_product, bz_component
raise LookupError('No matches for any recipients')
def get_newbug_payload_by_product_component(product: str, component: str) -> Dict:
config = get_config()
try:
payload = config['components'][product][component]['payload']
except KeyError:
payload = dict()
payload['product'] = product
payload['component'] = component
return payload
def msg_get_author(msg: email.message.EmailMessage) -> Tuple[str, str]:
author = ('', 'missing@address.local')
fh = msg.get('from')
if fh:
author = email.utils.getaddresses([fh])[0]
if not author[0]:
return 'Zorro Boogs', author[1]
return author
def msg_get_payload(msg: email.message.EmailMessage, strip_quoted: bool = False,
strip_signature: bool = True) -> str:
mp = msg.get_body(preferencelist=('plain',))
bbody = mp.get_payload(decode=True)
cs = mp.get_content_charset()
if not cs:
cs = 'utf-8'
cpay = bbody.decode(cs, errors='replace')
if strip_signature:
# Strip signature if we find it
chunks = cpay.rsplit('\n-- \n', maxsplit=1)
cbody = chunks[0]
else:
cbody = cpay
if not strip_quoted:
return cbody
stripped = list()
for line in cbody.splitlines():
if not line.startswith('> '):
stripped.append(line)
return '\n'.join(stripped)
def msg_parse_for_bug(msg: email.message.EmailMessage) -> Tuple[str, Tuple[str, str], str, str, List[Dict]]:
msgid = b4.LoreMessage.get_clean_msgid(msg)
cbody = msg_get_payload(msg)
lsub = b4.LoreSubject(msg.get('Subject', ''))
subject = lsub.subject
atts = msg_get_valid_attachments(msg)
author = msg_get_author(msg)
return msgid, author, subject, cbody, atts
def msg_get_valid_attachments(msg: email.message.EmailMessage) -> List[Dict]:
# Get all good attachments
config = get_config()
atts = list()
for part in msg.walk():
if part.get_content_disposition() != 'attachment':
continue
ct = part.get_content_type()
mts = config.get('mimetypes')
allowed = True
if mts and 'deny' in mts:
for dmt in mts['deny']:
if fnmatch(ct, dmt):
logger.debug('Skipping denied mime-type attachement: %s', ct)
allowed = False
break
if allowed:
databytes = part.get_payload(decode=True)
data = base64.b64encode(databytes).decode()
filename = part.get_filename()
if not filename:
filename = 'unnamed.txt'
ct = 'text/plain'
summary = filename
payload = {
'file_name': filename,
'content_type': ct,
'summary': summary,
'data': data,
}
atts.append(payload)
return atts
def get_recipients_by_product_component(product: str, component: str) -> Set[str]:
recip = set()
config = get_config()
try:
recip.update(config['notify']['alwayscc'])
logger.debug('added global alwayscc: %s', config['notify']['alwayscc'])
except KeyError:
pass
try:
recip.update(config['components'][product][component]['alwayscc'])
logger.debug('added %s/%s alwayscc: %s', product, component,
config['components'][product][component]['alwayscc'])
except KeyError:
pass
if not recip:
try:
recip.update(config['components'][product][component]['recipients'])
logger.debug('added %s/%s fallback recipients: %s', product, component,
config['components'][product][component]['recipients'])
except KeyError:
pass
return recip
def get_maintainer_data() -> Dict[str, Dict[str, set]]:
global MAINT_CACHE
if not len(MAINT_CACHE):
config = get_config()
ses = get_requests_session()
murl = config['bugzilla']['maintainers_url']
if murl.startswith('file://'):
with open(murl.replace('file://', ''), 'r') as fh:
mdata = fh.read()
else:
res = ses.get(config['bugzilla']['maintainers_url'])
res.raise_for_status()
mdata = res.text
lookfor = ['M:', 'R:', 'L:']
prevline = None
cur_sub = None
for line in mdata.splitlines():
if len(line) < 2 or not len(line[0].strip()):
cur_sub = None
continue
if line[:2] not in lookfor:
prevline = line
continue
if not cur_sub:
cur_sub = prevline
addr = email.utils.parseaddr(line[2:])
if cur_sub:
if cur_sub not in MAINT_CACHE:
MAINT_CACHE[cur_sub] = {'M': set(), 'R': set(), 'L': set()}
MAINT_CACHE[cur_sub][line[0]].add(addr[1])
return MAINT_CACHE
def get_recipients_by_subsystem(subsystem: str, limit: Optional[Set[str]] = None) -> Set[str]:
mdata = get_maintainer_data()
recips = set()
if subsystem in mdata:
for mtype, mset in mdata[subsystem].items():
if limit and mtype not in limit:
continue
recips.update(mset)
return recips
def get_privacy_mode(product: str, component: str) -> bool:
config = get_config()
if config['bugzilla'].get('privacy_mode', False):
return True
cconf = get_component_config(product, component)
return cconf.get('bz_privacy_mode', False)
def get_bug_recipients(bid: int) -> Set[str]:
# Get all db-stored recipients
# TODO: implement "onlyto"
allrecip = set()
try:
allrecip.update(db_get_recipients(bid))
except LookupError:
logger.debug('No in-database recipients for bid=%s', bid)
# Now get all bug cc recipients
bdata = bz_get_bug(bid, resolve_dupes=True)
product = bdata['product']
component = bdata['component']
config = get_config()
bugr = get_recipients_by_product_component(product, component)
if get_privacy_mode(product, component):
logger.debug('Privacy mode on, not adding addresses from the bug')
else:
bugr.update(bdata['cc'])
bugr.add(bdata['assigned_to'])
bugr.add(bdata['creator'])
allrecip.update(bugr)
subsystem_cf = config['bugzilla'].get('subsystem_cf')
if subsystem_cf:
subsystem = bdata.get(subsystem_cf)
if not subsystem:
# Is there a default_subsystem defined for this product/component?
cconf = get_component_config(product, component)
subsystem = cconf.get('default_subsystem')
if subsystem:
subr = get_recipients_by_subsystem(subsystem)
allrecip.update(subr)
# Remove "neverto" addresses
for mask in config['notify'].get('neverto', list()):
for addr in set(allrecip):
if fnmatch(addr, mask):
logger.debug('Removed %s because it matched neverto=%s', addr, mask)
allrecip.remove(addr)
return allrecip
def make_msgid(bid: int, cid: Optional[int]) -> str:
config = get_config()
bzurl = config['bugzilla']['url']
bzloc = urllib.parse.urlparse(bzurl)
slug = f'b{bid}'
if bid and cid:
count = bz_get_count_by_bid_cid(bid, cid)
slug += f'c{count}'
msgid = '<%s-%s-%s@%s>' % (datetime.date.today().strftime('%Y%m%d'), slug, uuid.uuid4().hex[:12], bzloc.netloc)
return msgid
def get_msgid(msg: email.message.EmailMessage) -> str:
return b4.LoreMessage.get_clean_msgid(msg)
def notify_bug(bid: int, cid: Optional[int], msg: email.message.EmailMessage, inre_cid: Optional[int] = None,
dry_run: bool = False) -> Optional[str]:
bdata = bz_get_bug(bid, resolve_dupes=True)
config = get_config()
if not msg.get('From'):
msg['From'] = config['notify'].get('fromaddr')
if not msg.get('To'):
recipients = get_bug_recipients(bid)
if not recipients:
logger.info('No recipients for bug %s, not notifying', bid)
return None
msg['To'] = b4.format_addrs([('', x) for x in recipients])
if not msg.get('Message-ID'):
msg['Message-ID'] = make_msgid(bid, cid)
if not msg.get('In-Reply-To'):
inre_msgid = None
try:
inre_msgid = db_get_msgid_by_bid_cid(bid, inre_cid)
except LookupError:
logger.debug('Could not find msgid matching bid=%s, cid=%s', bid, inre_cid)
# Find anything
try:
inre_msgid = db_get_msgid_by_bid_cid(bid, None)
except LookupError:
pass
if inre_msgid:
msg['In-Reply-To'] = f'<{inre_msgid}>'
msg['References'] = f'<{inre_msgid}>'
if not msg.get('Subject'):
msg['Subject'] = 'Re: %s' % bdata['summary']
else:
msg['Subject'] = bdata['summary']
msg['X-Bugzilla-Product'] = bdata['product']
msg['X-Bugzilla-Component'] = bdata['component']
msg['X-Mailer'] = f'{__APPNAME__} {__VERSION__}'
# Should we add other X-B headers?
# If we have notify.smtpserver and notify.smtpport defined, use that,
# otherwise use b4's get_smtp method
smtpserver = config['notify'].get('smtpserver')
smtpport = int(config['notify'].get('smtpport', '25'))
if smtpserver and smtpport:
import smtplib
smtp = smtplib.SMTP(smtpserver, smtpport)
else:
smtp, fromaddr = b4.get_smtp(dryrun=dry_run)
b4.send_mail(smtp, [msg], fromaddr=config['notify'].get('fromaddr'), dryrun=dry_run)
return msg.get('Message-ID')
def add_bot_signature(body: str) -> str:
config = get_config()
sigtpt = Template(config['templates'].get('botsig'))
sigvals = {
'myname': config['bugzilla'].get('name'),
'appname': __APPNAME__,
'appver': __VERSION__,
}
body += sigtpt.safe_substitute(sigvals)
return body
def pi_get_mbox(resp) -> List[email.message.EmailMessage]:
if resp.status_code != 200:
raise LookupError('Server returned an error: %s' % resp.status_code)
t_mbox = gzip.decompress(resp.content)
resp.close()
if not len(t_mbox):
raise LookupError('mbox is empty')
# noinspection PyTypeChecker
return b4.split_and_dedupe_pi_results(t_mbox)
def pi_get_query_results(query_url: str) -> List[email.message.EmailMessage]:
loc = urllib.parse.urlparse(query_url)
logger.debug('query=%s', query_url)
logger.debug('grabbing search results from %s', loc.netloc)
session = get_requests_session()
# For the query to retrieve a mbox file, we need to send a POST request
resp = session.post(query_url, data='')
return pi_get_mbox(resp)
def pi_get_new_since(url, msgid, since: str) -> List[email.message.EmailMessage]:
loc = urllib.parse.urlparse(url)
logger.debug('grabbing thread from %s', loc.netloc)
mbox_url = url.rstrip('/') + '/' + urllib.parse.quote_plus(msgid)
session = get_requests_session()
if since:
mbox_url += '/?x=m&q=' + urllib.parse.quote_plus(f'dt:{since}..')
return pi_get_query_results(mbox_url)
mbox_url += '/t.mbox.gz'
logger.debug('mbox_url=%s', mbox_url)
resp = session.get(mbox_url)
msgs = pi_get_mbox(resp)
# for whole threads, return a strict thread
return b4.get_strict_thread(msgs, msgid)
def pi_get_sorted_thread(url: str, msgid: str, since: Optional[str] = None) -> List[email.message.EmailMessage]:
msgs = pi_get_new_since(url, msgid, since)
if not msgs:
raise LookupError('No new messages matching url=%s, msgid=%s, since=%s' % (url, msgid, since))
return sort_msgs_by_received(msgs)
def db_get_sa() -> Tuple[sa.engine.Engine, sa.engine.Connection]:
global SACONN, SAENGINE
if SACONN is None:
config = get_config()
try:
dburl = config['db']['dburl']
except KeyError:
# we may pass it via env, which lets us better manage secrets
dburl = os.getenv('DBURL')
if not dburl:
raise LookupError('CRITICAL: main.dburl not set in config file')
db_pool_recycle = int(config['db'].get('dbpoolrecycle', '300'))
SAENGINE = sa.create_engine(dburl, pool_recycle=db_pool_recycle)
SACONN = SAENGINE.connect()
if SAENGINE.driver == 'pysqlite':
md = sa.MetaData()
try:
t_meta = sa.Table('meta', md, autoload=True, autoload_with=SAENGINE)
q = sa.select([t_meta.c.var_value]).where(t_meta.c.var_key == 'schema')
rp = SACONN.execute(q)
dbver = rp.fetchone()[0]
# Future logic to upgrade database here
logger.debug('dbver=%s', dbver)
except NoSuchTableError:
db_init_sa_sqlite_db(SAENGINE, SACONN)
return SAENGINE, SACONN
def db_init_sa_sqlite_db(engine: sa.engine.Engine, dbconn: sa.engine.Connection):
logger.info('Setting up SQLite database')
md = sa.MetaData()
meta = sa.Table('meta', md,
sa.Column('var_key', sa.Text()),
sa.Column('var_value', sa.Text()),
)
sa.Index('idx_meta_key_value', meta.c.var_key, meta.c.var_value, unique=True)
bmap = sa.Table('msgid_bug_mapping', md,
sa.Column('row_id', sa.Integer(), primary_key=True),
sa.Column('bug_id', sa.Integer(), nullable=False),
sa.Column('comment_id', sa.Integer(), nullable=False),
sa.Column('message_id', sa.Text(), nullable=False),
)
sa.Index('idx_msgid_bugid_commentid', bmap.c.message_id, bmap.c.bug_id, bmap.c.comment_id, unique=True)
sa.Index('idx_msgid_commentid', bmap.c.message_id, bmap.c.comment_id, unique=True)
sa.Index('idx_msgid_bugid', bmap.c.message_id, bmap.c.bug_id, unique=True)
cmap = sa.Table('commit_bug_mapping', md,
sa.Column('row_id', sa.Integer(), primary_key=True),
sa.Column('bug_id', sa.Integer(), nullable=False),
sa.Column('commit_id', sa.Text(), nullable=False),
)
# a commit can close multiple bugs
sa.Index('idx_commitid_bugid', cmap.c.commit_id, cmap.c.bug_id, unique=False)
recip = sa.Table('recipients', md,
sa.Column('row_id', sa.Integer(), primary_key=True),
sa.Column('bug_id', sa.Integer(), nullable=False),
sa.Column('email', sa.Text(), nullable=False),
)
sa.Index('idx_bugid_email', recip.c.bug_id, recip.c.email, unique=True)
md.create_all(engine)
q = sa.insert(meta).values(var_key='schema', var_value=str(__DBSCHEMA__))
dbconn.execute(q)