pi2bz: retrieve new messages in updated threads only

Use the new public-inbox feature to retrieve only the newest messages in
tracked threads.

Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
diff --git a/peebz/__init__.py b/peebz/__init__.py
index 9e2da76..2574d44 100644
--- a/peebz/__init__.py
+++ b/peebz/__init__.py
@@ -10,6 +10,7 @@
 import sys
 import uuid
 import datetime
+import gzip
 
 import urllib.parse
 
@@ -25,6 +26,9 @@
 emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None,
                                      message_factory=email.message.EmailMessage)
 
+# force b4 to use EmailMessage factory
+b4.emlpolicy = emlpolicy
+
 REQSESSION = None
 CONFIG = dict()
 
@@ -848,6 +852,10 @@
     return msgid
 
 
+def get_msgid(msg: email.message.EmailMessage) -> str:
+    return b4.LoreMessage.get_clean_msgid(msg)
+
+
 def notify_bug(bid: int, cid: Optional[int], msg: email.message.EmailMessage, inre_cid: Optional[int] = None,
                dry_run: bool = False) -> str:
     bdata = bz_get_bug(bid, resolve_dupes=True)
@@ -908,6 +916,51 @@
     return body
 
 
+def pi_get_mbox(resp) -> List[email.message.EmailMessage]:
+    if resp.status_code != 200:
+        raise LookupError('Server returned an error: %s' % resp.status_code)
+    t_mbox = gzip.decompress(resp.content)
+    resp.close()
+    if not len(t_mbox):
+        raise LookupError('mbox is empty')
+    # noinspection PyTypeChecker
+    return b4.split_and_dedupe_pi_results(t_mbox)
+
+
+def pi_get_query_results(query_url: str) -> List[email.message.EmailMessage]:
+    loc = urllib.parse.urlparse(query_url)
+    logger.debug('query=%s', query_url)
+    logger.debug('grabbing search results from %s', loc.netloc)
+    session = get_requests_session()
+    # For the query to retrieve a mbox file, we need to send a POST request
+    resp = session.post(query_url, data='')
+    return pi_get_mbox(resp)
+
+
+def pi_get_new_since(url, msgid, since: str) -> List[email.message.EmailMessage]:
+    loc = urllib.parse.urlparse(url)
+    logger.debug('grabbing thread from %s', loc.netloc)
+    mbox_url = url.rstrip('/') + '/' + urllib.parse.quote_plus(msgid)
+    session = get_requests_session()
+    if since:
+        mbox_url += '/?x=m&q=' + urllib.parse.quote_plus(f'dt:{since}..')
+        return pi_get_query_results(mbox_url)
+
+    mbox_url += '/t.mbox.gz'
+    logger.debug('mbox_url=%s', mbox_url)
+    resp = session.get(mbox_url)
+    msgs = pi_get_mbox(resp)
+    # for whole threads, return a strict thread
+    return b4.get_strict_thread(msgs, msgid)
+
+
+def pi_get_sorted_thread(url: str, msgid: str, since: Optional[str] = None) -> List[email.message.EmailMessage]:
+    msgs = pi_get_new_since(url, msgid, since)
+    if not msgs:
+        raise LookupError('No new messages matching url=%s, msgid=%s, since=%s' % (url, msgid, since))
+    return sort_msgs_by_received(msgs)
+
+
 def db_get_sa() -> Tuple[sa.engine.Engine, sa.engine.Connection]:
     global SACONN, SAENGINE
     if SACONN is None:
diff --git a/peebz/pi2bz.py b/peebz/pi2bz.py
index 32f59cc..e511d56 100644
--- a/peebz/pi2bz.py
+++ b/peebz/pi2bz.py
@@ -6,71 +6,13 @@
 import argparse
 import peebz
 import peebz.parse
-import b4
 import urllib.parse
-import email.message
-import gzip
 import datetime
 import re
 
-from typing import List, Set
+from typing import Set
 
 logger = peebz.logger
-b4.logger = logger
-# force b4 to use EmailMessage factory
-b4.emlpolicy = peebz.emlpolicy
-
-
-def get_query_results(query_url: str) -> List:
-    loc = urllib.parse.urlparse(query_url)
-    logger.debug('query=%s', query_url)
-    logger.debug('grabbing search results from %s', loc.netloc)
-    session = peebz.get_requests_session()
-    # For the query to retrieve a mbox file, we need to send a POST request
-    resp = session.post(query_url, data='')
-    if resp.status_code == 404:
-        raise LookupError('Nothing matching query=%s', query_url)
-    if resp.status_code != 200:
-        raise LookupError('Server returned an error for %s: %s' % (query_url, resp.status_code))
-    t_mbox = gzip.decompress(resp.content)
-    resp.close()
-    if not len(t_mbox):
-        raise LookupError('Nothing matching query=%s', query_url)
-    return b4.split_and_dedupe_pi_results(t_mbox)
-
-
-def get_sorted_thread(url: str, msgid: str) -> List[email.message.EmailMessage]:
-    loc = urllib.parse.urlparse(url)
-    mbox_url = url.rstrip('/') + '/' + urllib.parse.quote_plus(msgid) + '/t.mbox.gz'
-    logger.debug('mbox_url=%s', mbox_url)
-    logger.debug('grabbing thread from %s', loc.netloc)
-    session = peebz.get_requests_session()
-    resp = session.get(mbox_url)
-    if resp.status_code == 404:
-        raise LookupError('Nothing matching mbox_url=%s', mbox_url)
-    if resp.status_code != 200:
-        raise LookupError('Server returned an error for %s: %s' % (mbox_url, resp.status_code))
-    t_mbox = gzip.decompress(resp.content)
-    resp.close()
-
-    deduped = b4.split_and_dedupe_pi_results(t_mbox)
-    if not deduped:
-        raise LookupError('No messages matching mbox_url=%s' % mbox_url)
-    strict = b4.get_strict_thread(deduped, msgid)
-    return peebz.sort_msgs_by_received(strict)
-
-
-def get_new_msgs(msgs: List[email.message.EmailMessage]) -> List[email.message.EmailMessage]:
-    new_msgs = list()
-    for msg in msgs:
-        msgid = b4.LoreMessage.get_clean_msgid(msg)
-        try:
-            peebz.db_get_bid_cid_by_msgid(msgid)
-            continue
-        except LookupError:
-            new_msgs.append(msg)
-
-    return new_msgs
 
 
 def get_tracked_bug_msgids(product: str, component: str) -> Set[str]:
@@ -103,6 +45,10 @@
     tracked = get_tracked_bug_msgids(product, component)
     url = cconf.get('pi_url').rstrip('/')
     now = datetime.datetime.utcnow()
+    try:
+        last_check = peebz.db_get_query_last_check(product, component)
+    except LookupError:
+        last_check = None
 
     seen_msgids = set()
     updates = list()
@@ -110,13 +56,14 @@
         logger.info('Checking for updates in %s tracked threads', len(tracked))
         for msgid in tracked:
             try:
-                tmsgs = get_sorted_thread(url, msgid)
+                tmsgs = peebz.pi_get_sorted_thread(url, msgid, since=last_check)
+                logger.info('  %s new updates in: %s', len(tmsgs), msgid)
             except LookupError:
-                logger.debug('No results returned for msgid=%s', msgid)
+                logger.debug('  0 new updates in: %s', msgid)
                 continue
 
             for tmsg in tmsgs:
-                tmsgid = b4.LoreMessage.get_clean_msgid(tmsg)
+                tmsgid = peebz.get_msgid(tmsg)
                 if tmsgid in seen_msgids:
                     logger.debug('Already seen %s', tmsgid)
                     continue
@@ -133,20 +80,14 @@
     query = cconf.get('pi_query')
     if query:
         logger.info('Running query for %s/%s', product, component)
-        try:
-            last_check = peebz.db_get_query_last_check(product, component)
+        if last_check:
             query += f' AND dt:{last_check}..'
-        except LookupError:
-            pass
         qquery = urllib.parse.quote_plus(query)
         query_url = url.rstrip('/') + f'/?x=m&q={qquery}'
-        # Give a 10-minute overlap buffer
-        bufferago = now - datetime.timedelta(minutes=10)
-        lastdt = bufferago.strftime('%Y%m%d%H%M%S')
         try:
-            msgs = get_query_results(query_url)
+            msgs = peebz.pi_get_query_results(query_url)
             for msg in msgs:
-                msgid = b4.LoreMessage.get_clean_msgid(msg)
+                msgid = peebz.get_msgid(msg)
                 if msgid in seen_msgids:
                     logger.debug('Already seen %s', msgid)
                     continue
@@ -176,20 +117,23 @@
 
                 # Retrieve and queue up the entire thread
                 try:
-                    tmsgs = get_sorted_thread(url, msgid)
+                    tmsgs = peebz.pi_get_sorted_thread(url, msgid)
                 except LookupError:
                     logger.debug('No results returned for msgid=%s', msgid)
                     continue
                 for tmsg in tmsgs:
-                    tmsgid = b4.LoreMessage.get_clean_msgid(tmsg)
+                    tmsgid = peebz.get_msgid(tmsg)
                     seen_msgids.add(tmsgid)
                     updates.append(tmsg)
 
         except LookupError:
             logger.info('No new results for product=%s, component=%s', product, component)
 
-        if not dry_run:
-            peebz.db_store_query_last_check(product, component, lastdt)
+    if not dry_run:
+        # Give a 10-minute overlap buffer
+        bufferago = now - datetime.timedelta(minutes=10)
+        lastdt = bufferago.strftime('%Y%m%d%H%M%S')
+        peebz.db_store_query_last_check(product, component, lastdt)
 
     if not updates:
         logger.info('No new messages to add to bugzilla for %s/%s', product, component)