blob: f6785cc937e7aff2eadc8712100a4bec3dfaeae7 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2023 by the Linux Foundation
import sys
import argparse
import peebz
import b4
import re
from typing import Tuple, Dict, List
import email.message
import email.utils
logger = peebz.logger
def new_bug_notification(bid: int, inre_cid: int, dry_run: bool = False):
msg = email.message.EmailMessage()
config = peebz.get_config()
bodyvals = {
'bzname': config['bugzilla'].get('name'),
'bug_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid),
}
bodytpt = peebz.get_template_by_bid('new_bug_notify', bid)
body = bodytpt.safe_substitute(bodyvals)
sigtpt = peebz.get_template_by_bid('botsig', bid)
sigvals = {
'myname': config['bugzilla'].get('name'),
'appname': peebz.__APPNAME__,
'appver': peebz.__VERSION__,
}
body += sigtpt.safe_substitute(sigvals)
msg.set_payload(body, charset='utf-8')
peebz.notify_bug(bid, None, msg, inre_cid=inre_cid, dry_run=dry_run)
def make_bug_desc_from_body(product: str, component: str, body: str, vals: Dict) -> str:
if 'comment_count' in vals and vals['comment_count']:
tpt_intro = peebz.get_template_by_product_component('parse_bug_intro_with_count', product, component)
else:
tpt_intro = peebz.get_template_by_product_component('parse_bug_intro', product, component)
tpt_outro = peebz.get_template_by_product_component('parse_bug_outro', product, component)
desc = ''
intro = tpt_intro.safe_substitute(vals)
if intro:
desc = intro + '\n\n'
desc += body.strip()
outro = tpt_outro.safe_substitute(vals)
if outro:
desc += '\n\n' + outro
desc += '\n'
return desc
def new_bug_from_msg(msg: email.message.EmailMessage, product: str, component: str,
dry_run: bool = False) -> Tuple[int, int]:
msgid, author, subject, body, atts = peebz.msg_parse_for_bug(msg)
payload = peebz.get_newbug_payload_by_product_component(product, component)
summary = re.sub(r'^\s*(Re|Fwd):\s*', '', subject)
vals = {
'author': b4.format_addrs([author]),
'msgid_link': peebz.get_msgid_link(msgid),
}
desc = make_bug_desc_from_body(product, component, body, vals)
payload['summary'] = summary
payload['description'] = desc
if not dry_run:
bid, cid = peebz.bz_add_new_bug(payload)
logger.debug('new bug bid=%s, cid=%s', bid, cid)
recipients = peebz.msg_get_recipients(msg)
peebz.db_store_recipients(bid, recipients)
if atts:
peebz.bz_add_atts_to_bug(bid, atts)
else:
logger.info('--- DRY RUN ---')
logger.info('Would have created a new bug in %s/%s:', product, component)
logger.info('Summary: %s', payload['summary'])
logger.info('Description:')
logger.info(payload['description'])
bid = cid = None
return bid, cid
def new_comment_from_msg(bid: int, cid: int, msg: email.message.EmailMessage, dry_run: bool = False) -> int:
msgid, author, subject, body, atts = peebz.msg_parse_for_bug(msg)
vals = {
'author': b4.format_addrs([author]),
'msgid_link': peebz.get_msgid_link(msgid),
}
if cid:
try:
vals['comment_count'] = peebz.bz_get_count_by_bid_cid(bid, cid)
except LookupError:
pass
product, component = peebz.bz_get_product_component_by_bid(bid)
desc = make_bug_desc_from_body(product, component, body, vals)
if not dry_run:
cid = peebz.bz_add_new_comment(bid, desc)
recipients = peebz.msg_get_recipients(msg)
peebz.db_store_recipients(bid, recipients)
if atts:
peebz.bz_add_atts_to_bug(bid, atts)
else:
logger.info('--- DRY RUN ---')
logger.info('Would have added this comment to %s', bid)
logger.info(desc)
cid = None
return cid
def get_assignee(msg: email.message.EmailMessage, regexes: List[str]) -> str:
payload = peebz.msg_get_payload(msg)
fromaddr = peebz.msg_get_author(msg)[1]
assignee = None
for regex in regexes:
matches = re.search(regex, payload, flags=re.I | re.M)
if matches:
assignee = matches.groups()[0]
if assignee == 'me':
logger.debug('me=%s', fromaddr)
assignee = fromaddr
# Does this user exist?
try:
peebz.bz_get_user(assignee)
logger.debug('found assignee=%s (matched regex: %s)', assignee, regex)
# First match wins
break
except LookupError:
logger.info('Unable to assign to %s: no such user', assignee)
assignee = None
return assignee
def process_rfc2822(msg: email.message.EmailMessage, product: str, component: str,
dry_run: bool = False) -> None:
# Ignore any messages that have an X-Bugzilla-Product header,
# so we don't get into any loops
if msg.get('x-bugzilla-product'):
logger.debug('Skipping bugzilla-originating message')
return
cconf = peebz.get_component_config(product, component)
# Get the message-id
msgid = b4.LoreMessage.get_clean_msgid(msg)
try:
# If we have this exact msgid, then it's a dupe
bid, cid = peebz.db_get_bid_cid_by_msgid(msgid)
logger.info('Already recorded as bid=%s, cid=%s', bid, cid)
return
except LookupError:
pass
# Walk through references and in-reply-to and see if we know any of them
bid = cid = None
try:
bid, cid = peebz.msg_get_inre_bid_cid(msg)
except LookupError:
pass
if bid:
bdata = peebz.bz_get_bug(bid)
if not bdata['is_open']:
logger.info('Bug %s is closed, not adding comments', bid)
sys.exit(0)
cid = new_comment_from_msg(bid, cid, msg, dry_run=dry_run)
if not dry_run:
peebz.db_store_msgid_bid_cid(msgid, bid, cid)
else:
bid, cid = new_bug_from_msg(msg, product, component, dry_run=dry_run)
if not dry_run:
peebz.db_store_msgid_bid_cid(msgid, bid, cid)
if cconf.get('new_bug_send_notification'):
new_bug_notification(bid, cid, dry_run=dry_run)
# Do we have any assign triggers?
assign_res = cconf.get('pi_assign_regexes')
if assign_res:
assignee = get_assignee(msg, assign_res)
if assignee:
# Is this person allowed to set assignees?
author = peebz.msg_get_author(msg)
fromaddr = author[1]
if peebz.bz_check_user_allowed(fromaddr, product, component):
if not dry_run:
peebz.bz_assign_bug(bid, assignee)
else:
logger.debug('---DRY RUN---')
logger.debug('Would have assigned bid=%s to %s', bid, assignee)
else:
logger.debug('User %s is not allowed to set assignees', fromaddr)
def main(cmdargs: argparse.Namespace) -> None:
msg = peebz.get_msg_from_stdin()
product = cmdargs.product
component = cmdargs.component
if not (product and component):
recipients = peebz.msg_get_recipients(msg)
try:
product, component = peebz.get_product_component_by_recipients(recipients)
except LookupError as ex:
# TODO: fail properly here
logger.info(str(ex))
sys.exit(1)
process_rfc2822(msg, product, component, dry_run=cmdargs.dry_run)