blob: b78a648959e969530c9370890db870b9a9892a91 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2023 by the Linux Foundation
import argparse
import peebz
import datetime
import re
import b4
import email.message
from fnmatch import fnmatch
logger = peebz.logger
def process_new_comments(bid: int, dry_run: bool = False):
config = peebz.get_config()
cdatas = peebz.bz_get_newest_comments_for_bid(bid)
for cdata in cdatas:
# Check if we've already notified about this bug
cid = cdata['id']
try:
peebz.db_get_msgid_by_bid_cid(bid, cid)
logger.debug('Skipping, msgid match for bid=%s, cid=%s', bid, cid)
continue
except LookupError:
pass
# Check if the creator is in never_if_creator
skip = False
for mask in config['notify'].get('never_if_creator', list()):
if fnmatch(cdata['creator'], mask):
logger.debug('Skipping cid=%s because it matched never_if_creator=%s', cid, mask)
skip = True
break
if skip:
continue
# Check if text is in never_if_text_matches
for mask in config['notify'].get('never_if_text_matches', list()):
if fnmatch(cdata['text'], mask):
logger.debug('Skipping cid=%s because it matched never_if_text_matches=%s', cid, mask)
skip = True
break
if skip:
continue
clines = cdata['text'].strip().splitlines()
inre_cid = None
bodyvals = {
'bzname': config['bugzilla'].get('name'),
'bug_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid),
'comment_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid) + f"#c{cdata['count']}",
'comment_author': cdata['creator'],
}
if cdata['attachment_id']:
logger.info('Processing new attachment for bug_id=%s, comment_id=%s', bid, cid)
adata = peebz.bz_get_attachment_by_aid(
cdata['attachment_id'],
include_fields='file_name,size,content_type,summary,is_patch,is_private',
)
if adata['is_private']:
logger.debug('Skipping attachment marked private')
continue
bodytpt = peebz.get_template_by_bid('new_attachment_notify', bid)
bodyvals.update(adata)
bodyvals['attachment_url'] = config['bugzilla'].get('attachmask', '').format(
attachment_id=cdata['attachment_id'])
else:
logger.info('Processing new comment for bug_id=%s, comment_id=%s', bid, cid)
fline = clines[0]
matches = re.search(r'\(In reply to.*from comment #(\d+)', fline, flags=re.I)
if matches:
inre_count = int(matches.groups()[0])
try:
inre_cid = peebz.bz_get_cid_by_bid_count(bid, inre_count)
except LookupError:
pass
bodyvals['comment_text'] = '\n'.join(clines)
bodytpt = peebz.get_template_by_bid('new_comment_notify', bid)
msg = email.message.EmailMessage()
msg['Reply-To'] = b4.format_addrs([('', cdata['creator'])])
body = bodytpt.safe_substitute(bodyvals)
body = peebz.add_bot_signature(body)
msg.set_payload(body, charset='utf-8')
msgid = peebz.notify_bug(bid, cid, msg, inre_cid=inre_cid, dry_run=dry_run)
if msgid and not dry_run:
peebz.db_store_msgid_bid_cid(msgid, bid, cid)
peebz.db_store_recipients(bid, {cdata['creator']})
# TODO: This assumes that comments are always in incremental order
lastcheck = cdata['creation_time'].replace('T', ' ').rstrip('Z')
peebz.db_store_notify_last_check(bid, lastcheck)
def main(cmdargs: argparse.Namespace) -> None:
now = datetime.datetime.utcnow()
lastrun = now.strftime('%Y-%m-%d %H:%M:%S')
try:
# Get all new bugs that changed since last run
since = peebz.db_get_meta_value('notify_last_run')
except LookupError:
logger.debug('Got a LookupError, getting everything for the past hour')
# Assume it's the first run and get changes for the past hour
hourago = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
since = hourago.strftime('%Y-%m-%d %H:%M:%S')
# first, process all changed bugs that we're tracking
logger.info('Getting a list of changed bugs since %s', since)
buglist = peebz.bz_get_changed_bugs(since)
seen = set()
if buglist:
for bdata in buglist:
logger.debug('Looking at %s: %s', bdata['id'], bdata['summary'])
bid = bdata['id']
seen.add(bid)
process_new_comments(bid, dry_run=cmdargs.dry_run)
else:
logger.info('No changes to any tracked bugs')
# Now go by product/component and handle new bug queries if defined
config = peebz.get_config()
for bz_product, bz_components in config['components'].items():
for bz_component in bz_components.keys():
cconf = peebz.get_component_config(bz_product, bz_component)
qs = cconf.get('bz_new_bugs_quicksearch')
if not qs:
logger.debug('No quicksearch defined for %s/%s', bz_product, bz_component)
continue
logger.info('Querying matching quicksearch results since %s for %s/%s, qs=%s', since, bz_product,
bz_component, qs)
params = {
'chfieldfrom': since,
'product': bz_product,
'component': bz_component,
'quicksearch': qs,
}
buglist = peebz.bz_get_query_bugs(params, exclude=seen)
if buglist:
logger.info('Processing %s matching quicksearch bugs', len(buglist))
for bid in buglist:
seen.add(bid)
process_new_comments(bid, dry_run=cmdargs.dry_run)
else:
logger.info('No changed bugs matching these parameters.')
if not cmdargs.dry_run:
peebz.db_store_meta_value(key='notify_last_run', value=lastrun)