| #!/usr/bin/env python3 |
| import email |
| import email.message |
| import email.utils |
| import email.parser |
| import requests |
| import logging |
| import base64 |
| import b4 |
| import sys |
| import os |
| import uuid |
| import datetime |
| import gzip |
| |
| import urllib.parse |
| |
| import sqlalchemy as sa |
| |
| from typing import Dict, Tuple, List, Set, Optional |
| from fnmatch import fnmatch |
| from string import Template |
| |
| from sqlalchemy.exc import NoSuchTableError |
| |
| # Policy we use for saving mail locally |
| emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None, |
| message_factory=email.message.EmailMessage) |
| |
| # force b4 to use EmailMessage factory |
| b4.emlpolicy = emlpolicy |
| |
| REQSESSION = None |
| CONFIG = dict() |
| |
| __APPNAME__ = 'bugspray' |
| __VERSION__ = '0.1-dev' |
| |
| __DBSCHEMA__ = 1 |
| |
| logger = logging.getLogger(__APPNAME__) |
| b4.logger = logger |
| |
| SACONN = None |
| SAENGINE = None |
| REST_CACHE = dict() |
| MAINT_CACHE = dict() |
| |
| |
| def get_msg_from_stdin() -> Optional[email.message.EmailMessage]: |
| if not sys.stdin.isatty(): |
| msg = email.parser.BytesParser(policy=emlpolicy).parsebytes(sys.stdin.buffer.read()) |
| return msg # noqa |
| return None |
| |
| |
| def get_requests_session(): |
| global REQSESSION |
| if REQSESSION is None: |
| REQSESSION = requests.session() |
| REQSESSION.headers.update({'User-Agent': f'{__APPNAME__}/{__VERSION__}'}) |
| return REQSESSION |
| |
| |
| def get_config() -> Dict: |
| return CONFIG |
| |
| |
| def get_component_config(product: str, component: str) -> Dict: |
| config = get_config() |
| try: |
| return config['components'][product][component] |
| except KeyError: |
| return dict() |
| |
| |
| def get_template_by_bid(tptname: str, bid: int) -> Template: |
| product, component = bz_get_product_component_by_bid(bid) |
| return get_template_by_product_component(tptname, product, component) |
| |
| |
| def get_template_by_product_component(tptname: str, product: str, component: str) -> Template: |
| config = get_config() |
| try: |
| return Template(config['components'][product][component]['templates'][tptname]) |
| except KeyError: |
| pass |
| return Template(config['templates'][tptname]) |
| |
| |
| def get_msgid_link(msgid: str) -> str: |
| config = get_config() |
| linkmask = config['bugzilla'].get('linkmask', 'https://lore.kernel.org/{msgid}') |
| return linkmask.format(msgid=urllib.parse.quote_plus(msgid, safe='@')) |
| |
| |
| def get_pi_search(title: str) -> dict: |
| config = get_config() |
| try: |
| pconf = config['publicinbox']['searches'][title] |
| except KeyError: |
| raise LookupError('No such publicinbox search: %s' % title) |
| if 'inherits' in pconf and title != pconf['inherits']: |
| mconf = get_pi_search(pconf['inherits']) |
| mconf.update(pconf) |
| return mconf |
| return pconf |
| |
| |
| def bz_rest(path: str, payload: Dict = None, params: Dict = None, method: str = 'GET') -> Dict: |
| global REST_CACHE |
| # We only cache GETs without any params |
| if method == 'GET' and not params and path in REST_CACHE: |
| logger.debug('Using cached data') |
| return REST_CACHE[path] |
| |
| config = get_config() |
| url = '{BZURL}/{path}'.format(BZURL=config['bugzilla'].get('resturl').rstrip('/'), path=path) |
| logger.debug('Querying url=%s', url) |
| myparams = dict() if params is None else dict(params) |
| apikey = config['bugzilla'].get('apikey') |
| if not apikey: |
| apikey = os.getenv('BZAPIKEY', '') |
| myparams['api_key'] = apikey |
| ses = get_requests_session() |
| if method == 'GET': |
| res = ses.get(url, params=myparams) |
| elif method == 'POST': |
| res = ses.post(url, params=myparams, json=payload) |
| elif method == 'PUT': |
| res = ses.put(url, params=myparams, json=payload) |
| else: |
| logger.critical('CRITICAL: Unknown method=%s', method) |
| raise RuntimeError('Unknown method %s' % method) |
| |
| # logger.debug('res=%s', res.text) |
| if res.status_code == 404: |
| raise LookupError('Bugzilla returned 404: %s' % res.text) |
| |
| # for every other error, we just BT for now |
| res.raise_for_status() |
| rdata = res.json() |
| if method == 'GET' and not params: |
| # Cache it |
| REST_CACHE[path] = rdata |
| elif method != 'GET': |
| # We changed something, so nuke our cache |
| logger.debug('Clearing cache') |
| REST_CACHE = dict() |
| |
| return rdata |
| |
| |
| def bz_add_atts_to_bug(bid: int, atts: List[Dict]) -> List[int]: |
| aids = list() |
| for att in atts: |
| att['ids'] = [bid] |
| rdata = bz_rest(f'bug/{bid}/attachment', att, method='POST') |
| logger.debug('Created new attachment %s', ', '.join(rdata['ids'])) |
| aids += rdata['ids'] |
| return aids |
| |
| |
| def bz_set_bug_status_resolution(bid: int, status: str, resolution: str) -> None: |
| payload = { |
| 'status': status, |
| 'resolution': resolution, |
| } |
| logger.debug('Setting bug_id=%s status=%s resolution=%s', bid, status, resolution) |
| bz_rest(f'bug/{bid}', payload, method='PUT') |
| |
| |
| def bz_add_new_bug(payload: Dict) -> Tuple[int, int]: |
| if 'version' not in payload: |
| payload['version'] = 'unspecified' |
| rdata = bz_rest('bug', payload, method='POST') |
| bid = rdata['id'] |
| logger.debug('Created new bug %s', bid) |
| # Apparently, we don't get comment-id info from new bug creation |
| cid = bz_get_cid_by_bid_count(bid) |
| return bid, cid |
| |
| |
| def bz_add_new_comment(bid: int, comment: str) -> int: |
| payload = { |
| 'id': bid, |
| 'comment': comment |
| } |
| rdata = bz_rest(f'bug/{bid}/comment', payload, method='POST') |
| cid = rdata['id'] |
| logger.debug('Created new comment %s', cid) |
| return cid |
| |
| |
| def bz_get_bug(bid: int, resolve_dupes=False) -> Dict: |
| if resolve_dupes: |
| bid = bz_dedupe_bid(bid) |
| |
| path = f'bug/{bid}' |
| rdata = bz_rest(path) |
| for bdata in rdata['bugs']: |
| if bdata['id'] == bid: |
| return bdata |
| raise RuntimeError('Could not get bug info for %s' % bid) |
| |
| |
| def bz_get_user(username: str) -> Dict: |
| path = f'user/{username}' |
| try: |
| rdata = bz_rest(path) |
| for udata in rdata['users']: |
| if udata['name'] == username: |
| return udata |
| except LookupError: |
| pass |
| |
| raise LookupError('Could not get user info for %s' % username) |
| |
| |
| def bz_get_user_groups(username: str) -> Set: |
| udata = bz_get_user(username) |
| return set([x['name'] for x in udata['groups']]) |
| |
| |
| def bz_get_cid_by_bid_count(bid: int, count: int = 0) -> int: |
| bdata = bz_rest(f'bug/{bid}/comment') |
| for rbid, rbdata in bdata['bugs'].items(): |
| if int(rbid) != bid: |
| continue |
| for comment in rbdata['comments']: |
| if comment['count'] == count: |
| logger.debug('cid for %s/c%s is %s', bid, count, comment['id']) |
| return comment['id'] |
| raise LookupError('No cid matching bid=%s count=%s' % (bid, count)) |
| |
| |
| def bz_get_count_by_bid_cid(bid: int, cid: int) -> int: |
| bdata = bz_rest(f'bug/{bid}/comment') |
| for rbid, rbdata in bdata['bugs'].items(): |
| if int(rbid) != bid: |
| continue |
| for comment in rbdata['comments']: |
| if comment['id'] == cid: |
| logger.debug('count for %s/%s is c%s', bid, cid, comment['count']) |
| return comment['count'] |
| raise LookupError('No match for bid=%s cid=%s', bid, cid) |
| |
| |
| def bz_dedupe_bid(bid: int) -> int: |
| bdata = bz_get_bug(bid) |
| if bdata.get('dupe_of'): |
| # Nothing wrong with recursion |
| return bz_dedupe_bid(bdata['dupe_of']) |
| return bid |
| |
| |
| def bz_check_user_allowed(uid: str, product: str, component: str) -> bool: |
| cconf = get_component_config(product, component) |
| mustgroups = cconf.get('pi_must_bz_groups') |
| if mustgroups is None: |
| # No restrictions, anyone can do anything they like |
| return True |
| try: |
| udata = bz_get_user(uid) |
| except LookupError: |
| logger.debug('Could not find user %s in bugzilla', uid) |
| return False |
| |
| for mustgroup in mustgroups: |
| for ugroup in udata.get('groups', list()): |
| if ugroup['name'] == mustgroup: |
| logger.debug('%s mustgroup matches %s', uid, mustgroup) |
| return True |
| |
| logger.debug('%s not member of %s', uid, mustgroups) |
| return False |
| |
| |
| def bz_assign_bug(bid: int, uid: str) -> None: |
| logger.info('Assigning bug %s to %s', bid, uid) |
| path = f'bug/{bid}' |
| payload = { |
| 'assigned_to': uid, |
| } |
| bz_rest(path, payload=payload, method='PUT') |
| |
| |
| def bz_set_bug_product_component(bid: int, product: str, component: str) -> None: |
| logger.info('Setting bug %s to product=%s, component=%s', bid, product, component) |
| path = f'bug/{bid}' |
| payload = { |
| 'product': product, |
| 'component': component, |
| } |
| bz_rest(path, payload=payload, method='PUT') |
| |
| |
| def bz_set_bug_subsystem(bid: int, subsystem: str) -> None: |
| config = get_config() |
| subsystem_cf = config['bugzilla'].get('subsystem_cf') |
| if not subsystem_cf: |
| logger.info('No subsystem_cf defined for this bugzilla') |
| return |
| logger.info('Setting bug %s subsystem to %s', bid, subsystem) |
| path = f'bug/{bid}' |
| payload = { |
| subsystem_cf: subsystem |
| } |
| bz_rest(path, payload=payload, method='PUT') |
| |
| |
| def db_get_query_last_check(product: str, component: str) -> str: |
| key = f'query_{product}_{component}' |
| return db_get_meta_value(key) |
| |
| |
| def db_store_query_last_check(product: str, component: str, last_check: str) -> None: |
| key = f'query_{product}_{component}' |
| return db_store_meta_value(key, last_check) |
| |
| |
| def db_get_msgid_by_bid_cid(bid: int, cid: Optional[int]) -> str: |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine) |
| if cid: |
| q = sa.select([t_bmap.c.message_id]).where(t_bmap.c.bug_id == bid, t_bmap.c.comment_id == cid) |
| else: |
| # If cid is not defined, we get all mappings and use the lowest cid |
| q = sa.select([t_bmap.c.message_id]).where(t_bmap.c.bug_id == bid).order_by(t_bmap.c.comment_id) |
| |
| rp = dbconn.execute(q) |
| fa = rp.fetchall() |
| if len(fa): |
| logger.debug('query results for bid=%s, cid=%s: %s', bid, cid, fa) |
| return fa[0][0] |
| |
| raise LookupError('No message-id matching bid=%s, cid=%s' % (bid, cid)) |
| |
| |
| def db_get_bid_cid_by_msgid(msgid: str) -> Tuple[int, Optional[int]]: |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine) |
| logger.debug('Querying db for msgid=%s', msgid) |
| q = sa.select([t_bmap.c.bug_id, t_bmap.c.comment_id]).where(t_bmap.c.message_id == msgid) |
| rp = dbconn.execute(q) |
| fa = rp.fetchall() |
| if not len(fa): |
| raise LookupError('msgid %s not known' % msgid) |
| bid, cid = fa[0] |
| dbid = bz_dedupe_bid(bid) |
| if dbid != bid: |
| cid = None |
| logger.debug(' matching bid=%s, cid=%s', bid, cid) |
| return bid, cid |
| |
| |
| def db_store_msgid_bid_cid(msgid: str, bid: int, cid: int) -> None: |
| msgid = msgid.strip('<>') |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine) |
| q = sa.insert(t_bmap).values(message_id=msgid, bug_id=bid, comment_id=cid) |
| dbconn.execute(q) |
| logger.info('Created new mapping for %s: %s/%s', msgid, bid, cid) |
| |
| |
| def db_get_recipients(bid: int) -> Set[str]: |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_recip = sa.Table('recipients', md, autoload=True, autoload_with=engine) |
| logger.debug('Querying recipients for bid=%s', bid) |
| q = sa.select([t_recip.c.email]).where(t_recip.c.bug_id == bid) |
| rp = dbconn.execute(q) |
| fa = rp.fetchall() |
| if not len(fa): |
| raise LookupError('bid %s not known' % bid) |
| return set(x[0] for x in fa) |
| |
| |
| def db_get_bugs_for_commit(csha: str) -> Set[int]: |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_cmap = sa.Table('commit_bug_mapping', md, autoload=True, autoload_with=engine) |
| logger.debug('Querying for commit_id=%s', csha) |
| q = sa.select([t_cmap.c.bug_id]).where(t_cmap.c.commit_id == csha) |
| rp = dbconn.execute(q) |
| fa = rp.fetchall() |
| if not len(fa): |
| return set() |
| |
| return set(x[0] for x in fa) |
| |
| |
| def db_store_bug_for_commit(csha: str, bid: int) -> None: |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_cmap = sa.Table('commit_bug_mapping', md, autoload=True, autoload_with=engine) |
| logger.debug('Storing bug_id=%s for commit_id=%s', bid, csha) |
| q = sa.insert(t_cmap).values(bug_id=bid, commit_id=csha) |
| dbconn.execute(q) |
| |
| |
| def db_store_recipients(bid: int, recipients: Set[str]) -> None: |
| # TODO: add ability to unsubscribe? |
| try: |
| stored = db_get_recipients(bid) |
| except LookupError: |
| stored = set() |
| # Any new ones to store? |
| extras = recipients - stored |
| if not extras: |
| return |
| |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_recip = sa.Table('recipients', md, autoload=True, autoload_with=engine) |
| logger.debug('Storing new recipients for bid=%s', bid) |
| for addr in extras: |
| q = sa.insert(t_recip).values(bug_id=bid, email=addr) |
| dbconn.execute(q) |
| logger.debug(' Added %s', addr) |
| |
| |
| def db_get_meta_value(key: str) -> str: |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_meta = sa.Table('meta', md, autoload=True, autoload_with=engine) |
| q = sa.select([t_meta.c.var_value]).where(t_meta.c.var_key == key) |
| rp = dbconn.execute(q) |
| fa = rp.fetchall() |
| if not len(fa): |
| raise LookupError('meta key %s not known' % key) |
| return fa[0][0] |
| |
| |
| def db_store_meta_value(key: str, value: str) -> None: |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_meta = sa.Table('meta', md, autoload=True, autoload_with=engine) |
| with engine.begin() as dbconn: |
| q = sa.delete(t_meta).where(t_meta.c.var_key == key) |
| dbconn.execute(q) |
| q = sa.insert(t_meta).values(var_key=key, var_value=value) |
| dbconn.execute(q) |
| |
| |
| def db_store_notify_last_check(bid: int, when: str): |
| key = f'notify_bug_{bid}' |
| return db_store_meta_value(key, when) |
| |
| |
| def db_get_notify_last_check(bid: int) -> str: |
| key = f'notify_bug_{bid}' |
| return db_get_meta_value(key) |
| |
| |
| def bz_get_changed_bugs(since: str, include_untracked: bool = False) -> List: |
| logger.debug('Querying for changed bugs since %s', since) |
| params = { |
| 'chfieldfrom': since, |
| 'include_fields': 'id,summary,product,component', |
| } |
| rdata = bz_rest('bug', params=params) |
| if include_untracked: |
| return rdata['bugs'] |
| bids = tuple([x['id'] for x in rdata['bugs']]) |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine) |
| q = sa.select([t_bmap.c.bug_id]).where(t_bmap.c.bug_id.in_(bids)).distinct() |
| rp = dbconn.execute(q) |
| fa = rp.fetchall() |
| if not fa: |
| return list() |
| tracked = set([x[0] for x in fa]) |
| bugs = list() |
| for bdata in rdata['bugs']: |
| if bdata['id'] in tracked: |
| bugs.append(bdata) |
| |
| return bugs |
| |
| |
| def bz_quicksearch_bugs(query: str) -> Dict: |
| params = { |
| 'include_fields': 'id,summary,product,component', |
| 'quicksearch': query, |
| } |
| return bz_rest('bug', params=params) |
| |
| |
| def bz_get_query_bugs(params: Dict, exclude: Set[int]) -> List[int]: |
| if 'include_fields' not in params: |
| params['include_fields'] = 'id,summary,product,component' |
| rdata = bz_rest('bug', params=params) |
| bids = list() |
| for bdata in rdata['bugs']: |
| if bdata['id'] in exclude: |
| continue |
| bids.append(bdata['id']) |
| return bids |
| |
| |
| def get_human_size(size: int, decimals: int = 2) -> str: |
| units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] |
| while True: |
| unit = units.pop(0) |
| if size < 1024.0 or not len(units): |
| break |
| size /= 1024.0 |
| |
| return f'{size:.{decimals}f} {unit}' |
| |
| |
| def bz_get_attachment_by_aid(aid: int, include_fields: Optional[str] = None) -> Dict: |
| if include_fields is None: |
| rdata = bz_rest(f'bug/attachment/{aid}') |
| else: |
| params = { |
| 'include_fields': include_fields, |
| } |
| rdata = bz_rest(f'bug/attachment/{aid}', params=params) |
| |
| for raid, radata in rdata['attachments'].items(): |
| if int(raid) == aid: |
| if 'size' in radata: |
| radata['human_size'] = get_human_size(radata['size']) |
| return radata |
| |
| raise LookupError('No matching attachment_id found: %s' % aid) |
| |
| |
| def bz_get_all_comments_by_bid(bid: int) -> List[Dict]: |
| bdata = bz_rest(f'bug/{bid}/comment') |
| for rbid, rbdata in bdata['bugs'].items(): |
| if int(rbid) != bid: |
| continue |
| return rbdata['comments'] |
| |
| |
| def bz_get_comments_for_bid_since(bid: int, since: str) -> List[Dict]: |
| params = { |
| 'new_since': since, |
| 'id': bid, |
| } |
| bdata = bz_rest(f'bug/{bid}/comment', params=params) |
| for rbid, rbdata in bdata['bugs'].items(): |
| if int(rbid) != bid: |
| continue |
| return rbdata['comments'] |
| |
| |
| def bz_get_newest_comments_for_bid(bid: int, include_private: bool = False) -> List[Dict]: |
| try: |
| when = db_get_notify_last_check(bid) |
| except LookupError: |
| # grab the highest cid we know about |
| engine, dbconn = db_get_sa() |
| md = sa.MetaData() |
| t_bmap = sa.Table('msgid_bug_mapping', md, autoload=True, autoload_with=engine) |
| q = sa.select(sa.func.max(t_bmap.c.comment_id)).where(t_bmap.c.bug_id == bid) |
| rp = dbconn.execute(q) |
| mfa = rp.fetchall() |
| c_max = None |
| if mfa: |
| c_max = mfa[0][0] |
| cdatas = bz_get_all_comments_by_bid(bid) |
| comments = list() |
| for cdata in cdatas: |
| if c_max and cdata['id'] <= c_max: |
| continue |
| if not include_private and cdata['is_private']: |
| continue |
| comments.append(cdata) |
| return comments |
| |
| logger.debug('Getting newest comments since %s', when) |
| cdatas = bz_get_comments_for_bid_since(bid, when) |
| comments = list() |
| for cdata in cdatas: |
| if not include_private and cdata['is_private']: |
| continue |
| comments.append(cdata) |
| return comments |
| |
| |
| def msg_get_inre_msgids(msg: email.message.EmailMessage) -> List[str]: |
| pairs = list() |
| if msg.get('In-Reply-To'): |
| pairs += email.utils.getaddresses([str(x) for x in msg.get_all('in-reply-to', [])]) |
| if msg.get('References'): |
| pairs += email.utils.getaddresses([str(x) for x in msg.get_all('references', [])]) |
| |
| msgids = list() |
| for pair in pairs: |
| if pair[1].strip() and pair[1] not in msgids: |
| msgids.append(pair[1]) |
| |
| logger.debug('msgids=%s', str(msgids)) |
| return msgids |
| |
| |
| def sort_msgs_by_received(msgs: List[email.message.EmailMessage]) -> List[email.message.EmailMessage]: |
| tosort = list() |
| for msg in msgs: |
| latest_rdt = None |
| for rhdr in msg.get_all('Received'): |
| # The received headers are pretty free-form, but generally will end with ; datetimeinfo |
| chunks = rhdr.rsplit(';', maxsplit=1) |
| if len(chunks) < 2: |
| continue |
| rdate = chunks[1].strip() |
| rdt = email.utils.parsedate_tz(rdate) |
| if rdt is None: |
| continue |
| # use the latest rdt |
| if not latest_rdt or latest_rdt < rdt: |
| latest_rdt = rdt |
| latest_rdt = None |
| if not latest_rdt and msg.get('Date'): |
| # Use the Date header as fallback |
| latest_rdt = email.utils.parsedate_to_datetime(msg.get('Date')) |
| if not latest_rdt: |
| logger.debug('Message without a date: %s!', msg.get('Message-ID')) |
| continue |
| tosort.append((latest_rdt, msg)) |
| |
| sortedmsgs = list() |
| for rdt, msg in sorted(tosort, key=lambda x: x[0]): |
| sortedmsgs.append(msg) |
| |
| return sortedmsgs |
| |
| |
| def msg_get_inre_bid_cid(msg: email.message.EmailMessage) -> Tuple[int, int]: |
| msgids = msg_get_inre_msgids(msg) |
| if not msgids: |
| logger.debug('No references in msg') |
| raise LookupError('No references found') |
| |
| res = list() |
| for msgid in msgids: |
| logger.debug('Trying to match msgid=%s', msgid) |
| try: |
| (bid, cid) = db_get_bid_cid_by_msgid(msgid) |
| logger.debug('Matched msgid=%s to bid=%s, cid=%s', msgid, bid, cid) |
| res.append((bid, cid)) |
| except LookupError: |
| logger.debug('No match for msgid=%s', msgid) |
| # Try the next one until we run out |
| pass |
| if not res: |
| logger.debug('No reference matched database records') |
| raise LookupError('Nothing matching in the db') |
| |
| if len(res) > 1: |
| # Use the highest cid, which is going to be the latest matching comment |
| res.sort(key=lambda x: x[1], reverse=True) |
| return res[0] |
| |
| |
| def msg_get_recipients(msg: email.message.EmailMessage) -> Set[str]: |
| pairs = list() |
| if msg.get('To'): |
| pairs += email.utils.getaddresses([str(x) for x in msg.get_all('to', [])]) |
| if msg.get('Cc'): |
| pairs += email.utils.getaddresses([str(x) for x in msg.get_all('cc', [])]) |
| if msg.get('From'): |
| pairs += email.utils.getaddresses([str(x) for x in msg.get_all('from', [])]) |
| |
| return set([x[1].lower() for x in pairs]) |
| |
| |
| def bz_get_product_component_by_bid(bid: int) -> Tuple[str, str]: |
| bdata = bz_get_bug(bid, resolve_dupes=True) |
| return bdata['product'], bdata['component'] |
| |
| |
| def get_product_component_by_recipients(recipients: Set[str]) -> Tuple[str, str]: |
| config = get_config() |
| if not config.get('components'): |
| logger.debug('No components found in config') |
| raise LookupError('No components defined in config') |
| for recipient in recipients: |
| logger.debug('Matching %s', recipient) |
| for bz_product, bz_components in config['components'].items(): |
| for bz_component, c_config in bz_components.items(): |
| if not c_config.get('recipients'): |
| continue |
| for addr in c_config['recipients']: |
| if fnmatch(recipient, addr): |
| logger.debug('Matched %s with product=%s, component=%s', recipient, bz_product, bz_component) |
| return bz_product, bz_component |
| |
| raise LookupError('No matches for any recipients') |
| |
| |
| def get_newbug_payload_by_product_component(product: str, component: str) -> Dict: |
| config = get_config() |
| try: |
| payload = config['components'][product][component]['payload'] |
| except KeyError: |
| payload = dict() |
| payload['product'] = product |
| payload['component'] = component |
| return payload |
| |
| |
| def msg_get_author(msg: email.message.EmailMessage) -> Tuple[str, str]: |
| author = ('', 'missing@address.local') |
| fh = msg.get('from') |
| if fh: |
| author = email.utils.getaddresses([fh])[0] |
| |
| if not author[0]: |
| return 'Zorro Boogs', author[1] |
| |
| return author |
| |
| |
| def msg_get_payload(msg: email.message.EmailMessage, strip_quoted: bool = False, |
| strip_signature: bool = True) -> str: |
| mp = msg.get_body(preferencelist=('plain',)) |
| bbody = mp.get_payload(decode=True) |
| cs = mp.get_content_charset() |
| if not cs: |
| cs = 'utf-8' |
| cpay = bbody.decode(cs, errors='replace') |
| if strip_signature: |
| # Strip signature if we find it |
| chunks = cpay.rsplit('\n-- \n', maxsplit=1) |
| cbody = chunks[0] |
| else: |
| cbody = cpay |
| |
| if not strip_quoted: |
| return cbody |
| |
| stripped = list() |
| for line in cbody.splitlines(): |
| if not line.startswith('> '): |
| stripped.append(line) |
| return '\n'.join(stripped) |
| |
| |
| def msg_parse_for_bug(msg: email.message.EmailMessage) -> Tuple[str, Tuple[str, str], str, str, List[Dict]]: |
| msgid = b4.LoreMessage.get_clean_msgid(msg) |
| cbody = msg_get_payload(msg) |
| lsub = b4.LoreSubject(msg.get('Subject', '')) |
| subject = lsub.subject |
| atts = msg_get_valid_attachments(msg) |
| author = msg_get_author(msg) |
| |
| return msgid, author, subject, cbody, atts |
| |
| |
| def msg_get_valid_attachments(msg: email.message.EmailMessage) -> List[Dict]: |
| # Get all good attachments |
| config = get_config() |
| atts = list() |
| for part in msg.walk(): |
| if part.get_content_disposition() != 'attachment': |
| continue |
| ct = part.get_content_type() |
| mts = config.get('mimetypes') |
| allowed = True |
| if mts and 'deny' in mts: |
| for dmt in mts['deny']: |
| if fnmatch(ct, dmt): |
| logger.debug('Skipping denied mime-type attachement: %s', ct) |
| allowed = False |
| break |
| if allowed: |
| databytes = part.get_payload(decode=True) |
| data = base64.b64encode(databytes).decode() |
| filename = part.get_filename() |
| if not filename: |
| filename = 'unnamed.txt' |
| ct = 'text/plain' |
| summary = filename |
| payload = { |
| 'file_name': filename, |
| 'content_type': ct, |
| 'summary': summary, |
| 'data': data, |
| } |
| |
| atts.append(payload) |
| |
| return atts |
| |
| |
| def get_recipients_by_product_component(product: str, component: str) -> Set[str]: |
| recip = set() |
| config = get_config() |
| try: |
| recip.update(config['notify']['alwayscc']) |
| logger.debug('added global alwayscc: %s', config['notify']['alwayscc']) |
| except KeyError: |
| pass |
| try: |
| recip.update(config['components'][product][component]['alwayscc']) |
| logger.debug('added %s/%s alwayscc: %s', product, component, |
| config['components'][product][component]['alwayscc']) |
| except KeyError: |
| pass |
| if not recip: |
| try: |
| recip.update(config['components'][product][component]['recipients']) |
| logger.debug('added %s/%s fallback recipients: %s', product, component, |
| config['components'][product][component]['recipients']) |
| except KeyError: |
| pass |
| |
| return recip |
| |
| |
| def get_maintainer_data() -> Dict[str, Dict[str, set]]: |
| global MAINT_CACHE |
| if not len(MAINT_CACHE): |
| config = get_config() |
| ses = get_requests_session() |
| murl = config['bugzilla']['maintainers_url'] |
| if murl.startswith('file://'): |
| with open(murl.replace('file://', ''), 'r') as fh: |
| mdata = fh.read() |
| else: |
| res = ses.get(config['bugzilla']['maintainers_url']) |
| res.raise_for_status() |
| mdata = res.text |
| lookfor = ['M:', 'R:', 'L:'] |
| prevline = None |
| cur_sub = None |
| for line in mdata.splitlines(): |
| if len(line) < 2 or not len(line[0].strip()): |
| cur_sub = None |
| continue |
| if line[:2] not in lookfor: |
| prevline = line |
| continue |
| if not cur_sub: |
| cur_sub = prevline |
| addr = email.utils.parseaddr(line[2:]) |
| if cur_sub: |
| if cur_sub not in MAINT_CACHE: |
| MAINT_CACHE[cur_sub] = {'M': set(), 'R': set(), 'L': set()} |
| MAINT_CACHE[cur_sub][line[0]].add(addr[1]) |
| |
| return MAINT_CACHE |
| |
| |
| def get_recipients_by_subsystem(subsystem: str, limit: Optional[Set[str]] = None) -> Set[str]: |
| mdata = get_maintainer_data() |
| recips = set() |
| if subsystem in mdata: |
| for mtype, mset in mdata[subsystem].items(): |
| if limit and mtype not in limit: |
| continue |
| recips.update(mset) |
| return recips |
| |
| |
| def get_privacy_mode(product: str, component: str) -> bool: |
| config = get_config() |
| if config['bugzilla'].get('privacy_mode', False): |
| return True |
| cconf = get_component_config(product, component) |
| return cconf.get('bz_privacy_mode', False) |
| |
| |
| def get_bug_recipients(bid: int) -> Set[str]: |
| # Get all db-stored recipients |
| # TODO: implement "onlyto" |
| allrecip = set() |
| try: |
| allrecip.update(db_get_recipients(bid)) |
| except LookupError: |
| logger.debug('No in-database recipients for bid=%s', bid) |
| # Now get all bug cc recipients |
| bdata = bz_get_bug(bid, resolve_dupes=True) |
| product = bdata['product'] |
| component = bdata['component'] |
| config = get_config() |
| bugr = get_recipients_by_product_component(product, component) |
| if get_privacy_mode(product, component): |
| logger.debug('Privacy mode on, not adding addresses from the bug') |
| else: |
| bugr.update(bdata['cc']) |
| bugr.add(bdata['assigned_to']) |
| bugr.add(bdata['creator']) |
| |
| allrecip.update(bugr) |
| subsystem_cf = config['bugzilla'].get('subsystem_cf') |
| if subsystem_cf: |
| subsystem = bdata.get(subsystem_cf) |
| if not subsystem: |
| # Is there a default_subsystem defined for this product/component? |
| cconf = get_component_config(product, component) |
| subsystem = cconf.get('default_subsystem') |
| if subsystem: |
| subr = get_recipients_by_subsystem(subsystem) |
| allrecip.update(subr) |
| |
| # Remove "neverto" addresses |
| for mask in config['notify'].get('neverto', list()): |
| for addr in set(allrecip): |
| if fnmatch(addr, mask): |
| logger.debug('Removed %s because it matched neverto=%s', addr, mask) |
| allrecip.remove(addr) |
| return allrecip |
| |
| |
| def make_msgid(bid: int, cid: Optional[int]) -> str: |
| config = get_config() |
| bzurl = config['bugzilla']['url'] |
| bzloc = urllib.parse.urlparse(bzurl) |
| slug = f'b{bid}' |
| if bid and cid: |
| count = bz_get_count_by_bid_cid(bid, cid) |
| slug += f'c{count}' |
| |
| msgid = '<%s-%s-%s@%s>' % (datetime.date.today().strftime('%Y%m%d'), slug, uuid.uuid4().hex[:12], bzloc.netloc) |
| return msgid |
| |
| |
| def get_msgid(msg: email.message.EmailMessage) -> str: |
| return b4.LoreMessage.get_clean_msgid(msg) |
| |
| |
| def notify_bug(bid: int, cid: Optional[int], msg: email.message.EmailMessage, inre_cid: Optional[int] = None, |
| dry_run: bool = False) -> Optional[str]: |
| bdata = bz_get_bug(bid, resolve_dupes=True) |
| config = get_config() |
| if not msg.get('From'): |
| msg['From'] = config['notify'].get('fromaddr') |
| if not msg.get('To'): |
| recipients = get_bug_recipients(bid) |
| if not recipients: |
| logger.info('No recipients for bug %s, not notifying', bid) |
| return None |
| msg['To'] = b4.format_addrs([('', x) for x in recipients]) |
| if not msg.get('Message-ID'): |
| msg['Message-ID'] = make_msgid(bid, cid) |
| if not msg.get('In-Reply-To'): |
| inre_msgid = None |
| try: |
| inre_msgid = db_get_msgid_by_bid_cid(bid, inre_cid) |
| except LookupError: |
| logger.debug('Could not find msgid matching bid=%s, cid=%s', bid, inre_cid) |
| # Find anything |
| try: |
| inre_msgid = db_get_msgid_by_bid_cid(bid, None) |
| except LookupError: |
| pass |
| if inre_msgid: |
| msg['In-Reply-To'] = f'<{inre_msgid}>' |
| msg['References'] = f'<{inre_msgid}>' |
| if not msg.get('Subject'): |
| msg['Subject'] = 'Re: %s' % bdata['summary'] |
| else: |
| msg['Subject'] = bdata['summary'] |
| |
| msg['X-Bugzilla-Product'] = bdata['product'] |
| msg['X-Bugzilla-Component'] = bdata['component'] |
| msg['X-Mailer'] = f'{__APPNAME__} {__VERSION__}' |
| # Should we add other X-B headers? |
| # If we have notify.smtpserver and notify.smtpport defined, use that, |
| # otherwise use b4's get_smtp method |
| smtpserver = config['notify'].get('smtpserver') |
| smtpport = int(config['notify'].get('smtpport', '25')) |
| if smtpserver and smtpport: |
| import smtplib |
| smtp = smtplib.SMTP(smtpserver, smtpport) |
| else: |
| smtp, fromaddr = b4.get_smtp(dryrun=dry_run) |
| b4.send_mail(smtp, [msg], fromaddr=config['notify'].get('fromaddr'), dryrun=dry_run) |
| |
| return msg.get('Message-ID') |
| |
| |
| def add_bot_signature(body: str) -> str: |
| config = get_config() |
| sigtpt = Template(config['templates'].get('botsig')) |
| sigvals = { |
| 'myname': config['bugzilla'].get('name'), |
| 'appname': __APPNAME__, |
| 'appver': __VERSION__, |
| } |
| body += sigtpt.safe_substitute(sigvals) |
| return body |
| |
| |
| def pi_get_mbox(resp) -> List[email.message.EmailMessage]: |
| if resp.status_code != 200: |
| raise LookupError('Server returned an error: %s' % resp.status_code) |
| t_mbox = gzip.decompress(resp.content) |
| resp.close() |
| if not len(t_mbox): |
| raise LookupError('mbox is empty') |
| # noinspection PyTypeChecker |
| return b4.split_and_dedupe_pi_results(t_mbox) |
| |
| |
| def pi_get_query_results(query_url: str) -> List[email.message.EmailMessage]: |
| loc = urllib.parse.urlparse(query_url) |
| logger.debug('query=%s', query_url) |
| logger.debug('grabbing search results from %s', loc.netloc) |
| session = get_requests_session() |
| # For the query to retrieve a mbox file, we need to send a POST request |
| resp = session.post(query_url, data='') |
| return pi_get_mbox(resp) |
| |
| |
| def pi_get_new_since(url, msgid, since: str) -> List[email.message.EmailMessage]: |
| loc = urllib.parse.urlparse(url) |
| logger.debug('grabbing thread from %s', loc.netloc) |
| mbox_url = url.rstrip('/') + '/' + urllib.parse.quote_plus(msgid) |
| session = get_requests_session() |
| if since: |
| mbox_url += '/?x=m&q=' + urllib.parse.quote_plus(f'dt:{since}..') |
| return pi_get_query_results(mbox_url) |
| |
| mbox_url += '/t.mbox.gz' |
| logger.debug('mbox_url=%s', mbox_url) |
| resp = session.get(mbox_url) |
| msgs = pi_get_mbox(resp) |
| # for whole threads, return a strict thread |
| return b4.get_strict_thread(msgs, msgid) |
| |
| |
| def pi_get_sorted_thread(url: str, msgid: str, since: Optional[str] = None) -> List[email.message.EmailMessage]: |
| msgs = pi_get_new_since(url, msgid, since) |
| if not msgs: |
| raise LookupError('No new messages matching url=%s, msgid=%s, since=%s' % (url, msgid, since)) |
| return sort_msgs_by_received(msgs) |
| |
| |
| def db_get_sa() -> Tuple[sa.engine.Engine, sa.engine.Connection]: |
| global SACONN, SAENGINE |
| if SACONN is None: |
| config = get_config() |
| try: |
| dburl = config['db']['dburl'] |
| except KeyError: |
| # we may pass it via env, which lets us better manage secrets |
| dburl = os.getenv('DBURL') |
| if not dburl: |
| raise LookupError('CRITICAL: main.dburl not set in config file') |
| |
| db_pool_recycle = int(config['db'].get('dbpoolrecycle', '300')) |
| SAENGINE = sa.create_engine(dburl, pool_recycle=db_pool_recycle) |
| SACONN = SAENGINE.connect() |
| if SAENGINE.driver == 'pysqlite': |
| md = sa.MetaData() |
| try: |
| t_meta = sa.Table('meta', md, autoload=True, autoload_with=SAENGINE) |
| q = sa.select([t_meta.c.var_value]).where(t_meta.c.var_key == 'schema') |
| rp = SACONN.execute(q) |
| dbver = rp.fetchone()[0] |
| # Future logic to upgrade database here |
| logger.debug('dbver=%s', dbver) |
| except NoSuchTableError: |
| db_init_sa_sqlite_db(SAENGINE, SACONN) |
| |
| return SAENGINE, SACONN |
| |
| |
| def db_init_sa_sqlite_db(engine: sa.engine.Engine, dbconn: sa.engine.Connection): |
| logger.info('Setting up SQLite database') |
| md = sa.MetaData() |
| meta = sa.Table('meta', md, |
| sa.Column('var_key', sa.Text()), |
| sa.Column('var_value', sa.Text()), |
| ) |
| sa.Index('idx_meta_key_value', meta.c.var_key, meta.c.var_value, unique=True) |
| bmap = sa.Table('msgid_bug_mapping', md, |
| sa.Column('row_id', sa.Integer(), primary_key=True), |
| sa.Column('bug_id', sa.Integer(), nullable=False), |
| sa.Column('comment_id', sa.Integer(), nullable=False), |
| sa.Column('message_id', sa.Text(), nullable=False), |
| ) |
| sa.Index('idx_msgid_bugid_commentid', bmap.c.message_id, bmap.c.bug_id, bmap.c.comment_id, unique=True) |
| sa.Index('idx_msgid_commentid', bmap.c.message_id, bmap.c.comment_id, unique=True) |
| sa.Index('idx_msgid_bugid', bmap.c.message_id, bmap.c.bug_id, unique=True) |
| |
| cmap = sa.Table('commit_bug_mapping', md, |
| sa.Column('row_id', sa.Integer(), primary_key=True), |
| sa.Column('bug_id', sa.Integer(), nullable=False), |
| sa.Column('commit_id', sa.Text(), nullable=False), |
| ) |
| # a commit can close multiple bugs |
| sa.Index('idx_commitid_bugid', cmap.c.commit_id, cmap.c.bug_id, unique=False) |
| recip = sa.Table('recipients', md, |
| sa.Column('row_id', sa.Integer(), primary_key=True), |
| sa.Column('bug_id', sa.Integer(), nullable=False), |
| sa.Column('email', sa.Text(), nullable=False), |
| ) |
| sa.Index('idx_bugid_email', recip.c.bug_id, recip.c.email, unique=True) |
| md.create_all(engine) |
| q = sa.insert(meta).values(var_key='schema', var_value=str(__DBSCHEMA__)) |
| dbconn.execute(q) |