Move todo operations into the main thread

For ease of dealing with interrupt signals, run main queue operations in
the main thread.

Also adds some stub systemd files for running the grok-pull service and
the grok-fsck timers.

Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
diff --git a/contrib/grok-fsck@.service b/contrib/grok-fsck@.service
new file mode 100644
index 0000000..9427bd1
--- /dev/null
+++ b/contrib/grok-fsck@.service
@@ -0,0 +1,9 @@
+[Unit]
+Description=Grok-fsck service for %I
+Documentation=https://github.com/mricon/grokmirror
+
+[Service]
+Type=oneshot
+ExecStart=/usr/bin/grok-fsck -c /etc/grokmirror/%i.conf
+User=mirror
+Group=mirror
diff --git a/contrib/grok-fsck@.timer b/contrib/grok-fsck@.timer
new file mode 100644
index 0000000..0363fdd
--- /dev/null
+++ b/contrib/grok-fsck@.timer
@@ -0,0 +1,9 @@
+[Unit]
+Description=Grok-fsck timer for %I
+Documentation=https://github.com/mricon/grokmirror
+
+[Timer]
+OnCalendar=Sat 04:00
+
+[Install]
+WantedBy=timers.target
diff --git a/contrib/grok-pull@.service b/contrib/grok-pull@.service
new file mode 100644
index 0000000..b73e6d3
--- /dev/null
+++ b/contrib/grok-pull@.service
@@ -0,0 +1,13 @@
+[Unit]
+Description=Grok-pull service for %I
+After=network.target
+Documentation=https://github.com/mricon/grokmirror
+
+[Service]
+ExecStart=/usr/bin/grok-pull -c /etc/grokmirror/%i.conf
+Type=simple
+User=mirror
+Group=mirror
+
+[Install]
+WantedBy=multi-user.target
diff --git a/grokmirror/__init__.py b/grokmirror/__init__.py
index dfa56d1..631d7d7 100644
--- a/grokmirror/__init__.py
+++ b/grokmirror/__init__.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright (C) 2013-2018 by The Linux Foundation and contributors
+# Copyright (C) 2013-2020 by The Linux Foundation and contributors
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -45,7 +45,6 @@
 # default logger. Will probably be overridden.
 logger = logging.getLogger(__name__)
 
-_alt_repo_set = None
 _alt_repo_map = None
 
 # Used to store our requests session
@@ -871,3 +870,50 @@
     config.last_modified = fstat[8]
 
     return config
+
+
+def is_precious(fullpath):
+    args = ['config', '--get', 'extensions.preciousObjects']
+    retcode, output, error = run_git_command(fullpath, args)
+    if output.strip().lower() in ('yes', 'true', '1'):
+        return True
+    return False
+
+
+def get_repack_level(obj_info, max_loose_objects=1200, max_packs=20, pc_loose_objects=10, pc_loose_size=10):
+    # for now, hardcode the maximum loose objects and packs
+    # XXX: we can probably set this in git config values?
+    #      I don't think this makes sense as a global setting, because
+    #      optimal values will depend on the size of the repo as a whole
+    packs = int(obj_info['packs'])
+    count_loose = int(obj_info['count'])
+
+    needs_repack = 0
+
+    # first, compare against max values:
+    if packs >= max_packs:
+        logger.debug('Triggering full repack because packs > %s', max_packs)
+        needs_repack = 2
+    elif count_loose >= max_loose_objects:
+        logger.debug('Triggering quick repack because loose objects > %s', max_loose_objects)
+        needs_repack = 1
+    else:
+        # is the number of loose objects or their size more than 10% of
+        # the overall total?
+        in_pack = int(obj_info['in-pack'])
+        size_loose = int(obj_info['size'])
+        size_pack = int(obj_info['size-pack'])
+        total_obj = count_loose + in_pack
+        total_size = size_loose + size_pack
+        # set some arbitrary "worth bothering" limits so we don't
+        # continuously repack tiny repos.
+        if total_obj > 500 and count_loose / total_obj * 100 >= pc_loose_objects:
+            logger.debug('Triggering repack because loose objects > %s%% of total', pc_loose_objects)
+            needs_repack = 1
+        elif total_size > 1024 and size_loose / total_size * 100 >= pc_loose_size:
+            logger.debug('Triggering repack because loose size > %s%% of total', pc_loose_size)
+            needs_repack = 1
+
+    return needs_repack
+
+
diff --git a/grokmirror/fsck.py b/grokmirror/fsck.py
index 9040b08..b6cec05 100755
--- a/grokmirror/fsck.py
+++ b/grokmirror/fsck.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright (C) 2013-2018 by The Linux Foundation and contributors
+# Copyright (C) 2013-2020 by The Linux Foundation and contributors
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -24,6 +24,7 @@
 import random
 import datetime
 import shutil
+import gc
 
 from fcntl import lockf, LOCK_EX, LOCK_UN, LOCK_NB
 
@@ -334,11 +335,7 @@
 
 
 def check_precious_objects(fullpath):
-    args = ['config', '--get', 'extensions.preciousObjects']
-    retcode, output, error = grokmirror.run_git_command(fullpath, args)
-    if output.strip().lower() == 'true':
-        return True
-    return False
+    return grokmirror.is_precious(fullpath)
 
 
 def get_repack_level(obj_info, max_loose_objects=1200, max_packs=20, pc_loose_objects=10, pc_loose_size=10):
@@ -735,13 +732,13 @@
                 logger.info('   queued: %s (full repack)', fullpath)
             else:
                 logger.info('   queued: %s (repack)', fullpath)
-            logger.info('         : %s queued, %s total', analyzed, len(status))
+            logger.info('      ---: %s queued, %s total', analyzed, len(status))
         elif repack_only or repack_all_quick or repack_all_full:
             continue
         elif schedcheck <= today or force:
             to_process.add((fullpath, 'fsck', None))
             logger.info('   queued: %s (fsck)', fullpath)
-            logger.info('         : %s queued, %s total', analyzed, len(status))
+            logger.info('      ---: %s queued, %s total', analyzed, len(status))
 
     if obst_changes:
         # Refresh the alt repo map cache
@@ -894,17 +891,16 @@
             to_process.add((obstrepo, 'repack', repack_level))
             if repack_level > 1:
                 logger.info('   queued: %s (full repack)', os.path.basename(obstrepo))
-                logger.info('         : %s queued, %s total', analyzed, len(obstrepos))
             else:
                 logger.info('   queued: %s (repack)', os.path.basename(obstrepo))
-                logger.info('         : %s queued, %s total', analyzed, len(obstrepos))
+            logger.info('      ---: %s queued, %s total', analyzed, len(obstrepos))
         elif repack_only or repack_all_quick or repack_all_full:
             continue
         elif (nextcheck <= today or force) and not repack_only:
             status[obstrepo]['nextcheck'] = nextcheck.strftime('%F')
             to_process.add((obstrepo, 'fsck', None))
             logger.info('   queued: %s (fsck)', os.path.basename(obstrepo))
-            logger.info('         : %s queued, %s total', analyzed, len(obstrepos))
+            logger.info('      ---: %s queued, %s total', analyzed, len(obstrepos))
 
     if obst_changes:
         # We keep the same mtime, because the repos themselves haven't changed
@@ -915,6 +911,12 @@
         logger.info('No repos need attention.')
         return
 
+    # Delete some vars that are huge for large repo sets -- we no longer need them and the
+    # next step will likely eat lots of ram.
+    del obst_roots
+    del top_roots
+    gc.collect()
+
     logger.info('Processing %s repositories', len(to_process))
 
     for fullpath, action, repack_level in to_process:
@@ -957,7 +959,7 @@
         total_checked += 1
         total_elapsed += elapsed
         logger.info('     done: %ss', elapsed)
-        logger.info('           %s done, %s total', total_checked, len(to_process))
+        logger.info('      ---: %s done, %s queued', total_checked, len(to_process)-total_checked)
 
         # Write status file after each check, so if the process dies, we won't
         # have to recheck all the repos we've already checked
diff --git a/grokmirror/manifest.py b/grokmirror/manifest.py
index f8f6efa..7dc9510 100755
--- a/grokmirror/manifest.py
+++ b/grokmirror/manifest.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright (C) 2013-2018 by The Linux Foundation and contributors
+# Copyright (C) 2013-2020 by The Linux Foundation and contributors
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -180,7 +180,6 @@
 
     logger.setLevel(logging.DEBUG)
     # noinspection PyTypeChecker
-    em = enlighten.get_manager(series=' -=#')
 
     ch = logging.StreamHandler()
     formatter = logging.Formatter('%(message)s')
@@ -190,7 +189,6 @@
         ch.setLevel(logging.INFO)
     else:
         ch.setLevel(logging.CRITICAL)
-        em.enabled = False
 
     logger.addHandler(ch)
 
@@ -246,7 +244,6 @@
         # whole file when there is nothing in it or it can't be parsed.
         gitdirs = [os.path.realpath(x) for x in args]
         # Don't draw a progress bar for a single repo
-        em.enabled = False
 
     symlinks = list()
     tofetch = set()
diff --git a/grokmirror/pull.py b/grokmirror/pull.py
index 3722cd1..892204b 100755
--- a/grokmirror/pull.py
+++ b/grokmirror/pull.py
@@ -28,6 +28,7 @@
 import subprocess
 import shutil
 import tempfile
+import signal
 
 import calendar
 import uuid
@@ -41,6 +42,42 @@
 logger = logging.getLogger(__name__)
 
 
+class SignalHandler:
+
+    def __init__(self, config, sw, pws, done):
+        self.config = config
+        self.sw = sw
+        self.pws = pws
+        self.done = done
+        self.killed = False
+
+    def _handler(self, signum, frame):
+        self.killed = True
+        logger.debug('Received signum=%s, frame=%s', signum, frame)
+        self.sw.terminate()
+        self.sw.join()
+
+        for pw in self.pws:
+            pw.terminate()
+            pw.join()
+
+        if len(self.done):
+            update_manifest(self.config, self.done)
+
+        logger.info('Exiting on signal %s', signum)
+        sys.exit(0)
+
+    def __enter__(self):
+        self.old_sigint = signal.signal(signal.SIGINT, self._handler)
+        self.old_sigterm = signal.signal(signal.SIGTERM, self._handler)
+
+    def __exit__(self, sigtype, value, traceback):
+        if self.killed:
+            sys.exit(0)
+        signal.signal(signal.SIGINT, self.old_sigint)
+        signal.signal(signal.SIGTERM, self.old_sigterm)
+
+
 class Handler(StreamRequestHandler):
 
     def handle(self):
@@ -224,6 +261,13 @@
                                 logger.debug('Could not pack-refs in %s', fullpath)
                             # Run a bigger repack if objstore is involved
                             repack = True
+                        else:
+                            if not grokmirror.is_precious(fullpath):
+                                # See if doing a quick repack would be beneficial
+                                obj_info = grokmirror.get_repo_obj_info(fullpath)
+                                if grokmirror.get_repack_level(obj_info):
+                                    # We only do quick repacks, so we don't care about precise level
+                                    repack = True
 
                     modified = repoinfo.get('modified')
                     if modified is not None:
@@ -244,14 +288,18 @@
                     action = 'repack'
 
         if action == 'repack':
-            # Should only trigger after initial clone with objstore repo support, in order
-            # to remove a lot of duplicate objects. All other repacking should be done as part of grok-fsck
             logger.debug('quick-repacking %s', fullpath)
             args = ['repack', '-Adlq']
             logger.info('   repack: %s', gitdir)
             ecode, out, err = grokmirror.run_git_command(fullpath, args)
             if ecode > 0:
                 logger.debug('Could not repack %s', fullpath)
+            else:
+                logger.info(' packrefs: %s', gitdir)
+                args = ['pack-refs']
+                ecode, out, err = grokmirror.run_git_command(fullpath, args)
+                if ecode > 0:
+                    logger.debug('Could not pack-refs %s', fullpath)
 
         symlinks = repoinfo.get('symlinks')
         if os.path.exists(fullpath) and symlinks:
@@ -765,191 +813,20 @@
         logger.debug('queued: %s, %s', gitdir, action)
 
     if new_actions:
-        logger.info(' manifest: %s new actions', len(new_actions))
+        logger.info(' manifest: %s new updates', len(new_actions))
     else:
-        logger.info(' manifest: nothing to do')
+        logger.info(' manifest: no new updates')
 
     return new_actions
 
 
-def todo_worker(config, q_todo, q_pull, q_done, runonce, nomtime, forcepurge):
-    toplevel = os.path.realpath(config['core'].get('toplevel'))
-    obstdir = os.path.realpath(config['core'].get('objstore'))
-    refresh = config['pull'].getint('refresh', 300)
-    actions = set()
-    new_actions = fill_todo_from_manifest(config, actions, nomtime=nomtime, forcepurge=forcepurge)
-    if not len(new_actions) and runonce:
-        return
-
-    for (gitdir, repoinfo, action) in new_actions:
-        actions.add((gitdir, action))
-        q_todo.put((gitdir, repoinfo, action))
-
-    pull_threads = config['pull'].getint('pull_threads', 0)
-    if pull_threads < 1:
-        # take half of available CPUs by default
-        logger.info('pull_threads is not set, consider setting it')
-        pull_threads = int(mp.cpu_count() / 2)
-    pws = list()
-
-    busy = set()
-    done = list()
-    good = 0
-    bad = 0
-    loopmark = None
-    saidsleeping = False
-    lastrun = time.time()
-    while True:
-        for pw in pws:
-            if not pw.is_alive():
-                pws.remove(pw)
-                logger.info('   worker: terminated (%s remaining)', len(pws))
-                logger.info('      ---:  %s done, %s active, %s queued, %s failed',
-                            good, len(pws), q_pull.qsize() + q_todo.qsize(), bad)
-                continue
-
-        # Any new results?
-        try:
-            gitdir, repoinfo, q_action, success = q_done.get_nowait()
-            try:
-                actions.remove((gitdir, q_action))
-            except KeyError:
-                pass
-
-            forkgroup = repoinfo.get('forkgroup')
-            if forkgroup and forkgroup in busy:
-                busy.remove(forkgroup)
-            done.append((gitdir, repoinfo, q_action, success))
-            if success:
-                good += 1
-            else:
-                bad += 1
-            logger.info('     done: %s', gitdir)
-            logger.info('      ---: %s done, %s active, %s queued, %s failed',
-                        good, len(pws), q_pull.qsize() + q_todo.qsize(), bad)
-            if len(done) >= 100:
-                # Write manifest every 100 repos
-                update_manifest(config, done)
-                done = list()
-
-        except queue.Empty:
-            pass
-
-        if not runonce and time.time() - lastrun >= refresh:
-            new_actions = fill_todo_from_manifest(config, actions, nomtime=False, forcepurge=forcepurge)
-            for (gitdir, repoinfo, action) in new_actions:
-                actions.add((gitdir, action))
-                q_todo.put((gitdir, repoinfo, action))
-            lastrun = time.time()
-            saidsleeping = False
-
-        try:
-            gitdir, repoinfo, q_action = q_todo.get_nowait()
-        except queue.Empty:
-            if not len(pws) and q_pull.empty() and q_done.empty():
-                if done:
-                    update_manifest(config, done)
-                    done = list()
-                    if runonce:
-                        return
-                else:
-                    if not saidsleeping and not runonce:
-                        logger.info(' manifest: sleeping (%ss)', int(lastrun - time.time() + refresh))
-                        saidsleeping = True
-                    time.sleep(1)
-                continue
-            else:
-                time.sleep(0.1)
-                continue
-
-        if (gitdir, q_action) not in actions:
-            # Probably fed straight to the queue by the socket listener
-            actions.add((gitdir, q_action))
-        if repoinfo is None:
-            repoinfo = dict()
-
-        fullpath = os.path.join(toplevel, gitdir.lstrip('/'))
-        forkgroup = repoinfo.get('forkgroup')
-        if gitdir in busy or (forkgroup is not None and forkgroup in busy):
-            # Stick it back into the queue
-            q_todo.put((gitdir, repoinfo, q_action))
-            if loopmark is None:
-                loopmark = gitdir
-            elif loopmark == gitdir:
-                # sleep for a bit if we recognize that we've looped around all waiting repos
-                time.sleep(0.1)
-            continue
-
-        if gitdir == loopmark:
-            loopmark = None
-
-        if len(pws) < pull_threads:
-            pw = mp.Process(target=pull_worker, args=(config, q_pull, q_done))
-            pw.start()
-            pws.append(pw)
-            logger.info('   worker: started (%s running)', len(pws))
-
-        if q_action == 'objstore_migrate':
-            # Add forkgroup to busy, so we don't run any pulls until it's done
-            busy.add(repoinfo['forkgroup'])
-            obstrepo = grokmirror.setup_objstore_repo(obstdir, name=forkgroup)
-            grokmirror.add_repo_to_objstore(obstrepo, fullpath)
-            grokmirror.set_altrepo(fullpath, obstrepo)
-
-        if q_action != 'init':
-            # Easy actions that don't require priority logic
-            q_pull.put((gitdir, repoinfo, q_action, q_action))
-            continue
-
-        try:
-            grokmirror.lock_repo(fullpath, nonblocking=True)
-        except IOError:
-            # Stick it back into the queue until we are able to lock
-            # XXX: Is this the right thing to do here?
-            q_todo.put((gitdir, repoinfo, q_action))
-            continue
-
-        if not grokmirror.setup_bare_repo(fullpath):
-            logger.critical('Unable to bare-init %s', fullpath)
-            q_done.put((gitdir, repoinfo, q_action, False))
-            continue
-
-        fix_remotes(toplevel, gitdir, config['remote'].get('site'))
-        set_repo_params(fullpath, repoinfo)
-        grokmirror.unlock_repo(fullpath)
-
-        forkgroup = repoinfo.get('forkgroup')
-        if not forkgroup:
-            logger.debug('no-sibling clone: %s', gitdir)
-            q_pull.put((gitdir, repoinfo, 'pull', q_action))
-            continue
-
-        obstrepo = os.path.join(obstdir, '%s.git' % forkgroup)
-        if os.path.isdir(obstrepo):
-            logger.debug('clone %s with existing obstrepo %s', gitdir, obstrepo)
-            grokmirror.set_altrepo(fullpath, obstrepo)
-            if not repoinfo['private']:
-                grokmirror.add_repo_to_objstore(obstrepo, fullpath)
-            q_pull.put((gitdir, repoinfo, 'pull', q_action))
-            continue
-
-        # Set up a new obstrepo and make sure it's not used until the initial
-        # pull is done
-        logger.debug('cloning %s with new obstrepo %s', gitdir, obstrepo)
-        busy.add(forkgroup)
-        obstrepo = grokmirror.setup_objstore_repo(obstdir, name=forkgroup)
-        grokmirror.set_altrepo(fullpath, obstrepo)
-        if not repoinfo['private']:
-            grokmirror.add_repo_to_objstore(obstrepo, fullpath)
-        q_pull.put((gitdir, repoinfo, 'pull', q_action))
-
-
 def update_manifest(config, entries):
     manifile = config['core'].get('manifest')
     grokmirror.manifest_lock(manifile)
     manifest = grokmirror.read_manifest(manifile)
     changed = False
-    for gitdir, repoinfo, action, success in entries:
+    while len(entries):
+        gitdir, repoinfo, action, success = entries.pop()
         if not success:
             continue
         if action == 'purge':
@@ -981,6 +858,15 @@
     grokmirror.manifest_unlock(manifile)
 
 
+def socket_worker(config, q_todo, sockfile):
+    logger.info(' listener: listening on socket %s', sockfile)
+    with ThreadedUnixStreamServer(sockfile, Handler) as server:
+        # Stick some objects into the server
+        server.q_todo = q_todo
+        server.config = config
+        server.serve_forever()
+
+
 def pull_mirror(config, verbose=False, nomtime=False, forcepurge=False, runonce=False):
     global logger
 
@@ -1014,30 +900,191 @@
     # push it into grokmirror to override the default logger
     grokmirror.logger = logger
 
+    toplevel = os.path.realpath(config['core'].get('toplevel'))
+    obstdir = os.path.realpath(config['core'].get('objstore'))
+    refresh = config['pull'].getint('refresh', 300)
+
     q_todo = mp.Queue()
     q_pull = mp.Queue()
     q_done = mp.Queue()
 
-    mw = mp.Process(target=todo_worker, args=(config, q_todo, q_pull, q_done, runonce, nomtime, forcepurge))
-    mw.start()
+    sw = None
     sockfile = config['pull'].get('socket')
-    if runonce or sockfile is None:
-        mw.join()
+    if sockfile and not runonce:
+        if os.path.exists(sockfile):
+            mode = os.stat(sockfile).st_mode
+            if stat.S_ISSOCK(mode):
+                os.unlink(sockfile)
+            else:
+                raise IOError('File exists but is not a socket: %s' % sockfile)
+
+        sw = mp.Process(target=socket_worker, args=(config, q_todo, sockfile))
+        sw.start()
+
+    pws = list()
+    actions = set()
+    new_actions = fill_todo_from_manifest(config, actions, nomtime=nomtime, forcepurge=forcepurge)
+    if not len(new_actions) and runonce:
         return 0
 
-    if os.path.exists(sockfile):
-        mode = os.stat(sockfile).st_mode
-        if stat.S_ISSOCK(mode):
-            os.unlink(sockfile)
-        else:
-            raise IOError('File exists but is not a socket: %s' % sockfile)
+    for (gitdir, repoinfo, action) in new_actions:
+        actions.add((gitdir, action))
+        q_todo.put((gitdir, repoinfo, action))
 
-    logger.info(' listener: listening on socket %s', sockfile)
-    with ThreadedUnixStreamServer(sockfile, Handler) as server:
-        # Stick some objects into the server
-        server.q_todo = q_todo
-        server.config = config
-        server.serve_forever()
+    pull_threads = config['pull'].getint('pull_threads', 0)
+    if pull_threads < 1:
+        # take half of available CPUs by default
+        logger.info('pull_threads is not set, consider setting it')
+        pull_threads = int(mp.cpu_count() / 2)
+
+    busy = set()
+    done = list()
+    good = 0
+    bad = 0
+    loopmark = None
+    saidsleeping = False
+    lastrun = time.time()
+    with SignalHandler(config, sw, pws, done):
+        while True:
+            for pw in pws:
+                if not pw.is_alive():
+                    pws.remove(pw)
+                    logger.info('   worker: terminated (%s remaining)', len(pws))
+                    logger.info('      ---:  %s done, %s active, %s queued, %s failed',
+                                good, len(pws), q_pull.qsize() + q_todo.qsize(), bad)
+                    continue
+
+            # Any new results?
+            try:
+                gitdir, repoinfo, q_action, success = q_done.get_nowait()
+                try:
+                    actions.remove((gitdir, q_action))
+                except KeyError:
+                    pass
+
+                forkgroup = repoinfo.get('forkgroup')
+                if forkgroup and forkgroup in busy:
+                    busy.remove(forkgroup)
+                done.append((gitdir, repoinfo, q_action, success))
+                if success:
+                    good += 1
+                else:
+                    bad += 1
+                logger.info('     done: %s', gitdir)
+                logger.info('      ---: %s done, %s active, %s queued, %s failed',
+                            good, len(pws), q_pull.qsize() + q_todo.qsize(), bad)
+                if len(done) >= 100:
+                    # Write manifest every 100 repos
+                    update_manifest(config, done)
+
+            except queue.Empty:
+                pass
+
+            if not runonce and time.time() - lastrun >= refresh:
+                new_actions = fill_todo_from_manifest(config, actions, nomtime=False, forcepurge=forcepurge)
+                for (gitdir, repoinfo, action) in new_actions:
+                    actions.add((gitdir, action))
+                    q_todo.put((gitdir, repoinfo, action))
+                lastrun = time.time()
+                saidsleeping = False
+
+            try:
+                gitdir, repoinfo, q_action = q_todo.get_nowait()
+            except queue.Empty:
+                if not len(pws) and q_pull.empty() and q_done.empty():
+                    if done:
+                        update_manifest(config, done)
+                        if runonce:
+                            return 0
+                    else:
+                        if not saidsleeping and not runonce:
+                            logger.info(' manifest: sleeping (%ss)', int(lastrun - time.time() + refresh))
+                            saidsleeping = True
+                        time.sleep(1)
+                    continue
+                else:
+                    time.sleep(0.1)
+                    continue
+
+            if (gitdir, q_action) not in actions:
+                # Probably fed straight to the queue by the socket listener
+                actions.add((gitdir, q_action))
+            if repoinfo is None:
+                repoinfo = dict()
+
+            fullpath = os.path.join(toplevel, gitdir.lstrip('/'))
+            forkgroup = repoinfo.get('forkgroup')
+            if gitdir in busy or (forkgroup is not None and forkgroup in busy):
+                # Stick it back into the queue
+                q_todo.put((gitdir, repoinfo, q_action))
+                if loopmark is None:
+                    loopmark = gitdir
+                elif loopmark == gitdir:
+                    # sleep for a bit if we recognize that we've looped around all waiting repos
+                    time.sleep(0.1)
+                continue
+
+            if gitdir == loopmark:
+                loopmark = None
+
+            if len(pws) < pull_threads:
+                pw = mp.Process(target=pull_worker, args=(config, q_pull, q_done))
+                pw.start()
+                pws.append(pw)
+                logger.info('   worker: started (%s running)', len(pws))
+
+            if q_action == 'objstore_migrate':
+                # Add forkgroup to busy, so we don't run any pulls until it's done
+                busy.add(repoinfo['forkgroup'])
+                obstrepo = grokmirror.setup_objstore_repo(obstdir, name=forkgroup)
+                grokmirror.add_repo_to_objstore(obstrepo, fullpath)
+                grokmirror.set_altrepo(fullpath, obstrepo)
+
+            if q_action != 'init':
+                # Easy actions that don't require priority logic
+                q_pull.put((gitdir, repoinfo, q_action, q_action))
+                continue
+
+            try:
+                grokmirror.lock_repo(fullpath, nonblocking=True)
+            except IOError:
+                if not runonce:
+                    q_todo.put((gitdir, repoinfo, q_action))
+                continue
+
+            if not grokmirror.setup_bare_repo(fullpath):
+                logger.critical('Unable to bare-init %s', fullpath)
+                q_done.put((gitdir, repoinfo, q_action, False))
+                continue
+
+            fix_remotes(toplevel, gitdir, config['remote'].get('site'))
+            set_repo_params(fullpath, repoinfo)
+            grokmirror.unlock_repo(fullpath)
+
+            forkgroup = repoinfo.get('forkgroup')
+            if not forkgroup:
+                logger.debug('no-sibling clone: %s', gitdir)
+                q_pull.put((gitdir, repoinfo, 'pull', q_action))
+                continue
+
+            obstrepo = os.path.join(obstdir, '%s.git' % forkgroup)
+            if os.path.isdir(obstrepo):
+                logger.debug('clone %s with existing obstrepo %s', gitdir, obstrepo)
+                grokmirror.set_altrepo(fullpath, obstrepo)
+                if not repoinfo['private']:
+                    grokmirror.add_repo_to_objstore(obstrepo, fullpath)
+                q_pull.put((gitdir, repoinfo, 'pull', q_action))
+                continue
+
+            # Set up a new obstrepo and make sure it's not used until the initial
+            # pull is done
+            logger.debug('cloning %s with new obstrepo %s', gitdir, obstrepo)
+            busy.add(forkgroup)
+            obstrepo = grokmirror.setup_objstore_repo(obstdir, name=forkgroup)
+            grokmirror.set_altrepo(fullpath, obstrepo)
+            if not repoinfo['private']:
+                grokmirror.add_repo_to_objstore(obstrepo, fullpath)
+            q_pull.put((gitdir, repoinfo, 'pull', q_action))
 
     return 0
 
diff --git a/requirements.txt b/requirements.txt
index 808e267..a4a793f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,2 @@
-enlighten
 packaging
 requests
\ No newline at end of file
diff --git a/setup.py b/setup.py
index 00877a2..6584686 100644
--- a/setup.py
+++ b/setup.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
-# Copyright (C) 2013-2018 by The Linux Foundation and contributors
+# Copyright (C) 2013-2020 by The Linux Foundation and contributors
 #
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -55,7 +55,6 @@
     },
     install_requires=[
         'requests',
-        'enlighten',
     ],
     python_requires='>=3.6',
     entry_points={