| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # Copyright (C) 2023 by the Linux Foundation |
| |
| import argparse |
| import bugspray |
| import datetime |
| import time |
| import re |
| import b4 |
| |
| import email.message |
| import email.utils |
| |
| from fnmatch import fnmatch |
| |
| logger = bugspray.logger |
| |
| |
| def process_new_comments(bid: int, privacy_mode: bool = False, dry_run: bool = False): |
| config = bugspray.get_config() |
| cdatas = bugspray.bz_get_newest_comments_for_bid(bid) |
| msgts = int(time.time()) |
| for cdata in cdatas: |
| # Check if we've already notified about this bug |
| cid = cdata['id'] |
| try: |
| bugspray.db_get_msgid_by_bid_cid(bid, cid) |
| logger.debug('Skipping, msgid match for bid=%s, cid=%s', bid, cid) |
| continue |
| except LookupError: |
| pass |
| # Check if the creator is in never_if_creator |
| skip = False |
| for mask in config['notify'].get('never_if_creator', list()): |
| if fnmatch(cdata['creator'], mask): |
| logger.debug('Skipping cid=%s because it matched never_if_creator=%s', cid, mask) |
| skip = True |
| break |
| if skip: |
| continue |
| # Check if text is in never_if_text_matches |
| for mask in config['notify'].get('never_if_text_matches', list()): |
| if fnmatch(cdata['text'], mask): |
| logger.debug('Skipping cid=%s because it matched never_if_text_matches=%s', cid, mask) |
| skip = True |
| break |
| if skip: |
| continue |
| clines = cdata['text'].strip().splitlines() |
| inre_cid = None |
| creator = cdata['creator'] |
| comment_user = bugspray.bz_get_user(creator) |
| comment_author_name = comment_user.get('real_name') |
| if not comment_author_name: |
| comment_author_name = creator.split('@')[0] |
| if privacy_mode: |
| comment_author = comment_author_name |
| logger.debug('Privacy mode, comment_author=%s', comment_author) |
| else: |
| comment_author = f'{comment_author_name} <{creator}>' |
| |
| bodyvals = { |
| 'bzname': config['bugzilla'].get('name'), |
| 'bug_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid), |
| 'comment_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid) + f"#c{cdata['count']}", |
| 'comment_author': comment_author, |
| } |
| if cdata['attachment_id']: |
| logger.info('Processing new attachment for bug_id=%s, comment_id=%s', bid, cid) |
| adata = bugspray.bz_get_attachment_by_aid( |
| cdata['attachment_id'], |
| include_fields='file_name,size,content_type,summary,is_patch,is_private', |
| ) |
| if adata['is_private']: |
| logger.debug('Skipping attachment marked private') |
| continue |
| bodytpt = bugspray.get_template_by_bid('new_attachment_notify', bid) |
| bodyvals.update(adata) |
| bodyvals['attachment_url'] = config['bugzilla'].get('attachmask', '').format( |
| attachment_id=cdata['attachment_id']) |
| else: |
| logger.info('Processing new comment for bug_id=%s, comment_id=%s', bid, cid) |
| bodytpt = bugspray.get_template_by_bid('new_comment_notify', bid) |
| |
| fline = clines[0] |
| matches = re.search(r'\(In reply to.*from comment #(\d+)', fline, flags=re.I) |
| if matches: |
| inre_count = int(matches.groups()[0]) |
| try: |
| inre_cid = bugspray.bz_get_cid_by_bid_count(bid, inre_count) |
| except LookupError: |
| pass |
| bodyvals['comment_text'] = '\n'.join(clines) |
| |
| msg = email.message.EmailMessage() |
| fromaddr = email.utils.parseaddr(config['notify'].get('fromaddr')) |
| fromaddr = (f'{comment_author_name} via {fromaddr[0]}', fromaddr[1]) |
| msg['From'] = b4.format_addrs([fromaddr]) |
| if not privacy_mode: |
| msg['Reply-To'] = b4.format_addrs([(comment_author_name, cdata['creator'])]) |
| # A little hack to make sure Date: is consecutive for threads |
| try: |
| msg['Date'] = email.utils.formatdate(msgts + int(cdata['count']), localtime=True) |
| except ValueError: |
| pass |
| body = bodytpt.safe_substitute(bodyvals) |
| body = bugspray.add_bot_signature(body) |
| msg.set_payload(body, charset='utf-8') |
| msgid = bugspray.notify_bug(bid, cid, msg, inre_cid=inre_cid, dry_run=dry_run) |
| if msgid and not dry_run: |
| bugspray.db_store_msgid_bid_cid(msgid, bid, cid) |
| if not privacy_mode: |
| bugspray.db_store_recipients(bid, {cdata['creator']}) |
| # TODO: This assumes that comments are always in incremental order |
| lastcheck = cdata['creation_time'].replace('T', ' ').rstrip('Z') |
| bugspray.db_store_notify_last_check(bid, lastcheck) |
| |
| |
| def main(cmdargs: argparse.Namespace) -> None: |
| now = datetime.datetime.utcnow() |
| lastrun = now.strftime('%Y-%m-%d %H:%M:%S') |
| try: |
| # Get all new bugs that changed since last run |
| since = bugspray.db_get_meta_value('notify_last_run') |
| except LookupError: |
| logger.debug('Got a LookupError, getting everything for the past hour') |
| # Assume it's the first run and get changes for the past hour |
| hourago = datetime.datetime.utcnow() - datetime.timedelta(hours=1) |
| since = hourago.strftime('%Y-%m-%d %H:%M:%S') |
| |
| # first, process all changed bugs that we're tracking |
| logger.info('Getting a list of changed bugs since %s', since) |
| buglist = bugspray.bz_get_changed_bugs(since) |
| seen = set() |
| config = bugspray.get_config() |
| if buglist: |
| for bdata in buglist: |
| logger.debug('Looking at %s: %s', bdata['id'], bdata['summary']) |
| bid = bdata['id'] |
| privacy_mode = bugspray.get_privacy_mode(bdata['product'], bdata['component']) |
| process_new_comments(bid, privacy_mode=privacy_mode, dry_run=cmdargs.dry_run) |
| seen.add(bid) |
| else: |
| logger.info('No changes to any tracked bugs') |
| |
| # Now go by product/component and handle new bug queries if defined |
| for bz_product, bz_components in config['components'].items(): |
| for bz_component in bz_components.keys(): |
| cconf = bugspray.get_component_config(bz_product, bz_component) |
| qs = cconf.get('bz_new_bugs_quicksearch') |
| if not qs: |
| logger.debug('No quicksearch defined for %s/%s', bz_product, bz_component) |
| continue |
| logger.info('Querying matching quicksearch results since %s for %s/%s, qs=%s', since, bz_product, |
| bz_component, qs) |
| params = { |
| 'chfieldfrom': since, |
| 'product': bz_product, |
| 'component': bz_component, |
| 'quicksearch': qs, |
| } |
| buglist = bugspray.bz_get_query_bugs(params, exclude=seen) |
| if config['bugzilla'].get('privacy_mode', False) or cconf.get('bz_privacy_mode', False): |
| privacy_mode = True |
| else: |
| privacy_mode = False |
| if buglist: |
| logger.info('Processing %s matching quicksearch bugs', len(buglist)) |
| for bid in buglist: |
| seen.add(bid) |
| process_new_comments(bid, privacy_mode=privacy_mode, dry_run=cmdargs.dry_run) |
| else: |
| logger.info('No changed bugs matching these parameters.') |
| |
| if not cmdargs.dry_run: |
| bugspray.db_store_meta_value(key='notify_last_run', value=lastrun) |