| #!/usr/bin/env python3 |
| # -*- coding: utf-8 -*- |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # Copyright (C) 2023 by the Linux Foundation |
| |
| import argparse |
| import peebz |
| import peebz.parse |
| import b4 |
| import urllib.parse |
| import email.message |
| import gzip |
| import datetime |
| import re |
| |
| from typing import List, Set |
| |
| logger = peebz.logger |
| b4.logger = logger |
| # force b4 to use EmailMessage factory |
| b4.emlpolicy = peebz.emlpolicy |
| |
| |
| def get_query_results(query_url: str) -> List: |
| loc = urllib.parse.urlparse(query_url) |
| logger.debug('query=%s', query_url) |
| logger.debug('grabbing search results from %s', loc.netloc) |
| session = peebz.get_requests_session() |
| # For the query to retrieve a mbox file, we need to send a POST request |
| resp = session.post(query_url, data='') |
| if resp.status_code == 404: |
| raise LookupError('Nothing matching query=%s', query_url) |
| if resp.status_code != 200: |
| raise LookupError('Server returned an error for %s: %s' % (query_url, resp.status_code)) |
| t_mbox = gzip.decompress(resp.content) |
| resp.close() |
| if not len(t_mbox): |
| raise LookupError('Nothing matching query=%s', query_url) |
| return b4.split_and_dedupe_pi_results(t_mbox) |
| |
| |
| def get_sorted_thread(url: str, msgid: str) -> List[email.message.EmailMessage]: |
| loc = urllib.parse.urlparse(url) |
| mbox_url = url.rstrip('/') + '/' + urllib.parse.quote_plus(msgid) + '/t.mbox.gz' |
| logger.debug('mbox_url=%s', mbox_url) |
| logger.debug('grabbing thread from %s', loc.netloc) |
| session = peebz.get_requests_session() |
| resp = session.get(mbox_url) |
| if resp.status_code == 404: |
| raise LookupError('Nothing matching mbox_url=%s', mbox_url) |
| if resp.status_code != 200: |
| raise LookupError('Server returned an error for %s: %s' % (mbox_url, resp.status_code)) |
| t_mbox = gzip.decompress(resp.content) |
| resp.close() |
| |
| deduped = b4.split_and_dedupe_pi_results(t_mbox) |
| if not deduped: |
| raise LookupError('No messages matching mbox_url=%s' % mbox_url) |
| strict = b4.get_strict_thread(deduped, msgid) |
| return peebz.sort_msgs_by_received(strict) |
| |
| |
| def get_new_msgs(msgs: List[email.message.EmailMessage]) -> List[email.message.EmailMessage]: |
| new_msgs = list() |
| for msg in msgs: |
| msgid = b4.LoreMessage.get_clean_msgid(msg) |
| try: |
| peebz.db_get_bid_cid_by_msgid(msgid) |
| continue |
| except LookupError: |
| new_msgs.append(msg) |
| |
| return new_msgs |
| |
| |
| def get_tracked_bug_msgids(product: str, component: str) -> Set[str]: |
| cconf = peebz.get_component_config(product, component) |
| params = { |
| 'include_fields': 'id', |
| 'product': product, |
| 'component': component, |
| 'quicksearch': 'OPEN', |
| 'chfieldfrom': '90d', |
| } |
| params.update(cconf.get('bz_query_params', dict())) |
| rdata = peebz.bz_rest('bug', params=params) |
| msgids = set() |
| for bdata in rdata.get('bugs', list()): |
| bid = bdata['id'] |
| try: |
| msgid = peebz.db_get_msgid_by_bid_cid(bid, None) |
| logger.debug('bid=%s is tracked as msgid=%s', bid, msgid) |
| msgids.add(msgid) |
| except LookupError: |
| logger.debug('Not tracking bid=%s', bid) |
| |
| return msgids |
| |
| |
| def update_component(product: str, component: str, dry_run: bool = False): |
| logger.info('Running pi2bz for %s/%s, dry_run=%s', product, component, dry_run) |
| cconf = peebz.get_component_config(product, component) |
| tracked = get_tracked_bug_msgids(product, component) |
| url = cconf.get('pi_url').rstrip('/') |
| now = datetime.datetime.utcnow() |
| |
| seen_msgids = set() |
| updates = list() |
| if len(tracked): |
| logger.info('Checking for updates in %s tracked threads', len(tracked)) |
| for msgid in tracked: |
| try: |
| tmsgs = get_sorted_thread(url, msgid) |
| except LookupError: |
| logger.debug('No results returned for msgid=%s', msgid) |
| continue |
| |
| for tmsg in tmsgs: |
| tmsgid = b4.LoreMessage.get_clean_msgid(tmsg) |
| if tmsgid in seen_msgids: |
| logger.debug('Already seen %s', tmsgid) |
| continue |
| seen_msgids.add(tmsgid) |
| try: |
| peebz.db_get_bid_cid_by_msgid(tmsgid) |
| logger.debug('%s has already been processed', tmsgid) |
| continue |
| except LookupError: |
| logger.debug('New message in tracked thread: %s', tmsgid) |
| updates.append(tmsg) |
| |
| # Now grab the latest query matches |
| query = cconf.get('pi_query') |
| if query: |
| logger.info('Running query for %s/%s', product, component) |
| try: |
| last_check = peebz.db_get_query_last_check(product, component) |
| query += f' AND dt:{last_check}..' |
| except LookupError: |
| pass |
| qquery = urllib.parse.quote_plus(query) |
| query_url = url.rstrip('/') + f'/?x=m&q={qquery}' |
| # Give a 10-minute overlap buffer |
| bufferago = now - datetime.timedelta(minutes=10) |
| lastdt = bufferago.strftime('%Y%m%d%H%M%S') |
| try: |
| msgs = get_query_results(query_url) |
| for msg in msgs: |
| msgid = b4.LoreMessage.get_clean_msgid(msg) |
| if msgid in seen_msgids: |
| logger.debug('Already seen %s', msgid) |
| continue |
| |
| # New thing to track! |
| seen_msgids.add(msgid) |
| author = peebz.msg_get_author(msg) |
| fromaddr = author[1] |
| if not peebz.bz_check_user_allowed(fromaddr, product, component): |
| logger.debug('author=%s not allowed, skipping msg %s', fromaddr, msg.get('Subject')) |
| continue |
| # Check fine trigger, if configured |
| trigger_res = cconf.get('pi_trigger_regexes', list()) |
| if trigger_res: |
| payload = peebz.msg_get_payload(msg) |
| found = False |
| for trigger_re in trigger_res: |
| matches = re.search(trigger_re, payload, flags=re.I | re.M) |
| if matches: |
| logger.debug('found trigger_regex: %s', trigger_re) |
| found = True |
| break |
| |
| if not found: |
| logger.debug('trigger_regexes not found, skipping msg %s', msg.get('Subject')) |
| continue |
| |
| # Retrieve and queue up the entire thread |
| try: |
| tmsgs = get_sorted_thread(url, msgid) |
| except LookupError: |
| logger.debug('No results returned for msgid=%s', msgid) |
| continue |
| for tmsg in tmsgs: |
| tmsgid = b4.LoreMessage.get_clean_msgid(tmsg) |
| seen_msgids.add(tmsgid) |
| updates.append(tmsg) |
| |
| except LookupError: |
| logger.info('No new results for product=%s, component=%s', product, component) |
| |
| if not dry_run: |
| peebz.db_store_query_last_check(product, component, lastdt) |
| |
| if not updates: |
| logger.info('No new messages to add to bugzilla for %s/%s', product, component) |
| return |
| |
| for msg in updates: |
| logger.debug('Recording %s', msg.get('Subject')) |
| peebz.parse.process_rfc2822(msg, product, component, dry_run=dry_run) |
| |
| |
| def main(cmdargs: argparse.Namespace): |
| config = peebz.get_config() |
| # Iterate all components |
| for bz_product, bz_components in config['components'].items(): |
| for bz_component in bz_components.keys(): |
| update_component(bz_product, bz_component, dry_run=cmdargs.dry_run) |