| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # Copyright (C) 2023 by the Linux Foundation |
| |
| import argparse |
| import peebz |
| import datetime |
| import re |
| import b4 |
| |
| import email.message |
| |
| from fnmatch import fnmatch |
| |
| logger = peebz.logger |
| |
| |
| def process_new_comments(bid: int, dry_run: bool = False): |
| config = peebz.get_config() |
| cdatas = peebz.bz_get_newest_comments_for_bid(bid) |
| for cdata in cdatas: |
| # Check if we've already notified about this bug |
| cid = cdata['id'] |
| try: |
| peebz.db_get_msgid_by_bid_cid(bid, cid) |
| logger.debug('Skipping, msgid match for bid=%s, cid=%s', bid, cid) |
| continue |
| except LookupError: |
| pass |
| # Check if the creator is in never_if_creator |
| skip = False |
| for mask in config['notify'].get('never_if_creator', list()): |
| if fnmatch(cdata['creator'], mask): |
| logger.debug('Skipping cid=%s because it matched never_if_creator=%s', cid, mask) |
| skip = True |
| break |
| if skip: |
| continue |
| # Check if text is in never_if_text_matches |
| for mask in config['notify'].get('never_if_text_matches', list()): |
| if fnmatch(cdata['text'], mask): |
| logger.debug('Skipping cid=%s because it matched never_if_text_matches=%s', cid, mask) |
| skip = True |
| break |
| if skip: |
| continue |
| clines = cdata['text'].strip().splitlines() |
| inre_cid = None |
| bodyvals = { |
| 'bzname': config['bugzilla'].get('name'), |
| 'bug_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid), |
| 'comment_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid) + f"#c{cdata['count']}", |
| 'comment_author': cdata['creator'], |
| } |
| if cdata['attachment_id']: |
| logger.info('Processing new attachment for bug_id=%s, comment_id=%s', bid, cid) |
| adata = peebz.bz_get_attachment_by_aid( |
| cdata['attachment_id'], |
| include_fields='file_name,size,content_type,summary,is_patch,is_private', |
| ) |
| if adata['is_private']: |
| logger.debug('Skipping attachment marked private') |
| continue |
| bodytpt = peebz.get_template_by_bid('new_attachment_notify', bid) |
| bodyvals.update(adata) |
| bodyvals['attachment_url'] = config['bugzilla'].get('attachmask', '').format( |
| attachment_id=cdata['attachment_id']) |
| else: |
| logger.info('Processing new comment for bug_id=%s, comment_id=%s', bid, cid) |
| fline = clines[0] |
| matches = re.search(r'\(In reply to.*from comment #(\d+)', fline, flags=re.I) |
| if matches: |
| inre_count = int(matches.groups()[0]) |
| try: |
| inre_cid = peebz.bz_get_cid_by_bid_count(bid, inre_count) |
| except LookupError: |
| pass |
| bodyvals['comment_text'] = '\n'.join(clines) |
| bodytpt = peebz.get_template_by_bid('new_comment_notify', bid) |
| |
| msg = email.message.EmailMessage() |
| msg['Reply-To'] = b4.format_addrs([('', cdata['creator'])]) |
| body = bodytpt.safe_substitute(bodyvals) |
| body = peebz.add_bot_signature(body) |
| msg.set_payload(body, charset='utf-8') |
| msgid = peebz.notify_bug(bid, cid, msg, inre_cid=inre_cid, dry_run=dry_run) |
| if msgid and not dry_run: |
| peebz.db_store_msgid_bid_cid(msgid, bid, cid) |
| peebz.db_store_recipients(bid, {cdata['creator']}) |
| # TODO: This assumes that comments are always in incremental order |
| lastcheck = cdata['creation_time'].replace('T', ' ').rstrip('Z') |
| peebz.db_store_notify_last_check(bid, lastcheck) |
| |
| |
| def main(cmdargs: argparse.Namespace) -> None: |
| now = datetime.datetime.utcnow() |
| lastrun = now.strftime('%Y-%m-%d %H:%M:%S') |
| try: |
| # Get all new bugs that changed since last run |
| since = peebz.db_get_meta_value('notify_last_run') |
| except LookupError: |
| logger.debug('Got a LookupError, getting everything for the past hour') |
| # Assume it's the first run and get changes for the past hour |
| hourago = datetime.datetime.utcnow() - datetime.timedelta(hours=1) |
| since = hourago.strftime('%Y-%m-%d %H:%M:%S') |
| |
| # first, process all changed bugs that we're tracking |
| logger.info('Getting a list of changed bugs since %s', since) |
| buglist = peebz.bz_get_changed_bugs(since) |
| seen = set() |
| if buglist: |
| for bdata in buglist: |
| logger.debug('Looking at %s: %s', bdata['id'], bdata['summary']) |
| bid = bdata['id'] |
| seen.add(bid) |
| process_new_comments(bid, dry_run=cmdargs.dry_run) |
| else: |
| logger.info('No changes to any tracked bugs') |
| |
| # Now go by product/component and handle new bug queries if defined |
| config = peebz.get_config() |
| for bz_product, bz_components in config['components'].items(): |
| for bz_component in bz_components.keys(): |
| cconf = peebz.get_component_config(bz_product, bz_component) |
| qs = cconf.get('bz_new_bugs_quicksearch') |
| if not qs: |
| logger.debug('No quicksearch defined for %s/%s', bz_product, bz_component) |
| continue |
| logger.info('Querying matching quicksearch results since %s for %s/%s, qs=%s', since, bz_product, |
| bz_component, qs) |
| params = { |
| 'chfieldfrom': since, |
| 'product': bz_product, |
| 'component': bz_component, |
| 'quicksearch': qs, |
| } |
| buglist = peebz.bz_get_query_bugs(params, exclude=seen) |
| if buglist: |
| logger.info('Processing %s matching quicksearch bugs', len(buglist)) |
| for bid in buglist: |
| seen.add(bid) |
| process_new_comments(bid, dry_run=cmdargs.dry_run) |
| else: |
| logger.info('No changed bugs matching these parameters.') |
| |
| if not cmdargs.dry_run: |
| peebz.db_store_meta_value(key='notify_last_run', value=lastrun) |