blob: 32f59ccf1f96634d9e34829f67ffcb83114d9706 [file] [log] [blame]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2023 by the Linux Foundation
import argparse
import peebz
import peebz.parse
import b4
import urllib.parse
import email.message
import gzip
import datetime
import re
from typing import List, Set
logger = peebz.logger
b4.logger = logger
# force b4 to use EmailMessage factory
b4.emlpolicy = peebz.emlpolicy
def get_query_results(query_url: str) -> List:
loc = urllib.parse.urlparse(query_url)
logger.debug('query=%s', query_url)
logger.debug('grabbing search results from %s', loc.netloc)
session = peebz.get_requests_session()
# For the query to retrieve a mbox file, we need to send a POST request
resp = session.post(query_url, data='')
if resp.status_code == 404:
raise LookupError('Nothing matching query=%s', query_url)
if resp.status_code != 200:
raise LookupError('Server returned an error for %s: %s' % (query_url, resp.status_code))
t_mbox = gzip.decompress(resp.content)
resp.close()
if not len(t_mbox):
raise LookupError('Nothing matching query=%s', query_url)
return b4.split_and_dedupe_pi_results(t_mbox)
def get_sorted_thread(url: str, msgid: str) -> List[email.message.EmailMessage]:
loc = urllib.parse.urlparse(url)
mbox_url = url.rstrip('/') + '/' + urllib.parse.quote_plus(msgid) + '/t.mbox.gz'
logger.debug('mbox_url=%s', mbox_url)
logger.debug('grabbing thread from %s', loc.netloc)
session = peebz.get_requests_session()
resp = session.get(mbox_url)
if resp.status_code == 404:
raise LookupError('Nothing matching mbox_url=%s', mbox_url)
if resp.status_code != 200:
raise LookupError('Server returned an error for %s: %s' % (mbox_url, resp.status_code))
t_mbox = gzip.decompress(resp.content)
resp.close()
deduped = b4.split_and_dedupe_pi_results(t_mbox)
if not deduped:
raise LookupError('No messages matching mbox_url=%s' % mbox_url)
strict = b4.get_strict_thread(deduped, msgid)
return peebz.sort_msgs_by_received(strict)
def get_new_msgs(msgs: List[email.message.EmailMessage]) -> List[email.message.EmailMessage]:
new_msgs = list()
for msg in msgs:
msgid = b4.LoreMessage.get_clean_msgid(msg)
try:
peebz.db_get_bid_cid_by_msgid(msgid)
continue
except LookupError:
new_msgs.append(msg)
return new_msgs
def get_tracked_bug_msgids(product: str, component: str) -> Set[str]:
cconf = peebz.get_component_config(product, component)
params = {
'include_fields': 'id',
'product': product,
'component': component,
'quicksearch': 'OPEN',
'chfieldfrom': '90d',
}
params.update(cconf.get('bz_query_params', dict()))
rdata = peebz.bz_rest('bug', params=params)
msgids = set()
for bdata in rdata.get('bugs', list()):
bid = bdata['id']
try:
msgid = peebz.db_get_msgid_by_bid_cid(bid, None)
logger.debug('bid=%s is tracked as msgid=%s', bid, msgid)
msgids.add(msgid)
except LookupError:
logger.debug('Not tracking bid=%s', bid)
return msgids
def update_component(product: str, component: str, dry_run: bool = False):
logger.info('Running pi2bz for %s/%s, dry_run=%s', product, component, dry_run)
cconf = peebz.get_component_config(product, component)
tracked = get_tracked_bug_msgids(product, component)
url = cconf.get('pi_url').rstrip('/')
now = datetime.datetime.utcnow()
seen_msgids = set()
updates = list()
if len(tracked):
logger.info('Checking for updates in %s tracked threads', len(tracked))
for msgid in tracked:
try:
tmsgs = get_sorted_thread(url, msgid)
except LookupError:
logger.debug('No results returned for msgid=%s', msgid)
continue
for tmsg in tmsgs:
tmsgid = b4.LoreMessage.get_clean_msgid(tmsg)
if tmsgid in seen_msgids:
logger.debug('Already seen %s', tmsgid)
continue
seen_msgids.add(tmsgid)
try:
peebz.db_get_bid_cid_by_msgid(tmsgid)
logger.debug('%s has already been processed', tmsgid)
continue
except LookupError:
logger.debug('New message in tracked thread: %s', tmsgid)
updates.append(tmsg)
# Now grab the latest query matches
query = cconf.get('pi_query')
if query:
logger.info('Running query for %s/%s', product, component)
try:
last_check = peebz.db_get_query_last_check(product, component)
query += f' AND dt:{last_check}..'
except LookupError:
pass
qquery = urllib.parse.quote_plus(query)
query_url = url.rstrip('/') + f'/?x=m&q={qquery}'
# Give a 10-minute overlap buffer
bufferago = now - datetime.timedelta(minutes=10)
lastdt = bufferago.strftime('%Y%m%d%H%M%S')
try:
msgs = get_query_results(query_url)
for msg in msgs:
msgid = b4.LoreMessage.get_clean_msgid(msg)
if msgid in seen_msgids:
logger.debug('Already seen %s', msgid)
continue
# New thing to track!
seen_msgids.add(msgid)
author = peebz.msg_get_author(msg)
fromaddr = author[1]
if not peebz.bz_check_user_allowed(fromaddr, product, component):
logger.debug('author=%s not allowed, skipping msg %s', fromaddr, msg.get('Subject'))
continue
# Check fine trigger, if configured
trigger_res = cconf.get('pi_trigger_regexes', list())
if trigger_res:
payload = peebz.msg_get_payload(msg)
found = False
for trigger_re in trigger_res:
matches = re.search(trigger_re, payload, flags=re.I | re.M)
if matches:
logger.debug('found trigger_regex: %s', trigger_re)
found = True
break
if not found:
logger.debug('trigger_regexes not found, skipping msg %s', msg.get('Subject'))
continue
# Retrieve and queue up the entire thread
try:
tmsgs = get_sorted_thread(url, msgid)
except LookupError:
logger.debug('No results returned for msgid=%s', msgid)
continue
for tmsg in tmsgs:
tmsgid = b4.LoreMessage.get_clean_msgid(tmsg)
seen_msgids.add(tmsgid)
updates.append(tmsg)
except LookupError:
logger.info('No new results for product=%s, component=%s', product, component)
if not dry_run:
peebz.db_store_query_last_check(product, component, lastdt)
if not updates:
logger.info('No new messages to add to bugzilla for %s/%s', product, component)
return
for msg in updates:
logger.debug('Recording %s', msg.get('Subject'))
peebz.parse.process_rfc2822(msg, product, component, dry_run=dry_run)
def main(cmdargs: argparse.Namespace):
config = peebz.get_config()
# Iterate all components
for bz_product, bz_components in config['components'].items():
for bz_component in bz_components.keys():
update_component(bz_product, bz_component, dry_run=cmdargs.dry_run)