| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # Copyright (C) 2023 by the Linux Foundation |
| |
| import sys |
| import argparse |
| import peebz |
| import b4 |
| import re |
| |
| from typing import Tuple, Dict, List |
| |
| import email.message |
| import email.utils |
| |
| logger = peebz.logger |
| |
| |
| def new_bug_notification(bid: int, inre_cid: int, dry_run: bool = False): |
| msg = email.message.EmailMessage() |
| config = peebz.get_config() |
| bodyvals = { |
| 'bzname': config['bugzilla'].get('name'), |
| 'bug_url': config['bugzilla'].get('bugmask', '').format(bug_id=bid), |
| } |
| bodytpt = peebz.get_template_by_bid('new_bug_notify', bid) |
| body = bodytpt.safe_substitute(bodyvals) |
| sigtpt = peebz.get_template_by_bid('botsig', bid) |
| sigvals = { |
| 'myname': config['bugzilla'].get('name'), |
| 'appname': peebz.__APPNAME__, |
| 'appver': peebz.__VERSION__, |
| } |
| body += sigtpt.safe_substitute(sigvals) |
| msg.set_payload(body, charset='utf-8') |
| peebz.notify_bug(bid, None, msg, inre_cid=inre_cid, dry_run=dry_run) |
| |
| |
| def make_bug_desc_from_body(product: str, component: str, body: str, vals: Dict) -> str: |
| if 'comment_count' in vals and vals['comment_count']: |
| tpt_intro = peebz.get_template_by_product_component('parse_bug_intro_with_count', product, component) |
| else: |
| tpt_intro = peebz.get_template_by_product_component('parse_bug_intro', product, component) |
| tpt_outro = peebz.get_template_by_product_component('parse_bug_outro', product, component) |
| desc = '' |
| intro = tpt_intro.safe_substitute(vals) |
| if intro: |
| desc = intro + '\n\n' |
| desc += body.strip() |
| outro = tpt_outro.safe_substitute(vals) |
| if outro: |
| desc += '\n\n' + outro |
| desc += '\n' |
| |
| return desc |
| |
| |
| def new_bug_from_msg(msg: email.message.EmailMessage, product: str, component: str, |
| dry_run: bool = False) -> Tuple[int, int]: |
| msgid, author, subject, body, atts = peebz.msg_parse_for_bug(msg) |
| payload = peebz.get_newbug_payload_by_product_component(product, component) |
| summary = re.sub(r'^\s*(Re|Fwd):\s*', '', subject) |
| vals = { |
| 'author': b4.format_addrs([author]), |
| 'msgid_link': peebz.get_msgid_link(msgid), |
| } |
| desc = make_bug_desc_from_body(product, component, body, vals) |
| |
| payload['summary'] = summary |
| payload['description'] = desc |
| if not dry_run: |
| bid, cid = peebz.bz_add_new_bug(payload) |
| logger.debug('new bug bid=%s, cid=%s', bid, cid) |
| recipients = peebz.msg_get_recipients(msg) |
| peebz.db_store_recipients(bid, recipients) |
| if atts: |
| peebz.bz_add_atts_to_bug(bid, atts) |
| else: |
| logger.info('--- DRY RUN ---') |
| logger.info('Would have created a new bug in %s/%s:', product, component) |
| logger.info('Summary: %s', payload['summary']) |
| logger.info('Description:') |
| logger.info(payload['description']) |
| bid = cid = None |
| |
| return bid, cid |
| |
| |
| def new_comment_from_msg(bid: int, cid: int, msg: email.message.EmailMessage, dry_run: bool = False) -> int: |
| msgid, author, subject, body, atts = peebz.msg_parse_for_bug(msg) |
| vals = { |
| 'author': b4.format_addrs([author]), |
| 'msgid_link': peebz.get_msgid_link(msgid), |
| } |
| if cid: |
| try: |
| vals['comment_count'] = peebz.bz_get_count_by_bid_cid(bid, cid) |
| except LookupError: |
| pass |
| product, component = peebz.bz_get_product_component_by_bid(bid) |
| |
| desc = make_bug_desc_from_body(product, component, body, vals) |
| |
| if not dry_run: |
| cid = peebz.bz_add_new_comment(bid, desc) |
| recipients = peebz.msg_get_recipients(msg) |
| peebz.db_store_recipients(bid, recipients) |
| if atts: |
| peebz.bz_add_atts_to_bug(bid, atts) |
| else: |
| logger.info('--- DRY RUN ---') |
| logger.info('Would have added this comment to %s', bid) |
| logger.info(desc) |
| cid = None |
| return cid |
| |
| |
| def get_assignee(msg: email.message.EmailMessage, regexes: List[str]) -> str: |
| payload = peebz.msg_get_payload(msg) |
| fromaddr = peebz.msg_get_author(msg)[1] |
| assignee = None |
| |
| for regex in regexes: |
| matches = re.search(regex, payload, flags=re.I | re.M) |
| if matches: |
| assignee = matches.groups()[0] |
| if assignee == 'me': |
| logger.debug('me=%s', fromaddr) |
| assignee = fromaddr |
| # Does this user exist? |
| try: |
| peebz.bz_get_user(assignee) |
| logger.debug('found assignee=%s (matched regex: %s)', assignee, regex) |
| # First match wins |
| break |
| except LookupError: |
| logger.info('Unable to assign to %s: no such user', assignee) |
| assignee = None |
| |
| return assignee |
| |
| |
| def process_rfc2822(msg: email.message.EmailMessage, product: str, component: str, |
| dry_run: bool = False) -> None: |
| # Ignore any messages that have an X-Bugzilla-Product header, |
| # so we don't get into any loops |
| if msg.get('x-bugzilla-product'): |
| logger.debug('Skipping bugzilla-originating message') |
| return |
| |
| cconf = peebz.get_component_config(product, component) |
| # Get the message-id |
| msgid = b4.LoreMessage.get_clean_msgid(msg) |
| try: |
| # If we have this exact msgid, then it's a dupe |
| bid, cid = peebz.db_get_bid_cid_by_msgid(msgid) |
| logger.info('Already recorded as bid=%s, cid=%s', bid, cid) |
| return |
| except LookupError: |
| pass |
| |
| # Walk through references and in-reply-to and see if we know any of them |
| bid = cid = None |
| try: |
| bid, cid = peebz.msg_get_inre_bid_cid(msg) |
| except LookupError: |
| pass |
| |
| if bid: |
| bdata = peebz.bz_get_bug(bid) |
| if not bdata['is_open']: |
| logger.info('Bug %s is closed, not adding comments', bid) |
| sys.exit(0) |
| |
| cid = new_comment_from_msg(bid, cid, msg, dry_run=dry_run) |
| if not dry_run: |
| peebz.db_store_msgid_bid_cid(msgid, bid, cid) |
| else: |
| bid, cid = new_bug_from_msg(msg, product, component, dry_run=dry_run) |
| if not dry_run: |
| peebz.db_store_msgid_bid_cid(msgid, bid, cid) |
| if cconf.get('new_bug_send_notification'): |
| new_bug_notification(bid, cid, dry_run=dry_run) |
| |
| # Do we have any assign triggers? |
| assign_res = cconf.get('pi_assign_regexes') |
| if assign_res: |
| assignee = get_assignee(msg, assign_res) |
| if assignee: |
| # Is this person allowed to set assignees? |
| author = peebz.msg_get_author(msg) |
| fromaddr = author[1] |
| if peebz.bz_check_user_allowed(fromaddr, product, component): |
| if not dry_run: |
| peebz.bz_assign_bug(bid, assignee) |
| else: |
| logger.debug('---DRY RUN---') |
| logger.debug('Would have assigned bid=%s to %s', bid, assignee) |
| else: |
| logger.debug('User %s is not allowed to set assignees', fromaddr) |
| |
| |
| def main(cmdargs: argparse.Namespace) -> None: |
| msg = peebz.get_msg_from_stdin() |
| product = cmdargs.product |
| component = cmdargs.component |
| if not (product and component): |
| recipients = peebz.msg_get_recipients(msg) |
| try: |
| product, component = peebz.get_product_component_by_recipients(recipients) |
| except LookupError as ex: |
| # TODO: fail properly here |
| logger.info(str(ex)) |
| sys.exit(1) |
| process_rfc2822(msg, product, component, dry_run=cmdargs.dry_run) |