| #!/usr/bin/env python3 |
| # Grabs a file full of subscribers and makes sure that |
| # all new ones are subscribed and all the ones that are |
| # gone are unsubscribed |
| # |
| # The URL of the file to download should be in |
| # /var/spool/mlmmj/list.name.foo/.external-subscribers-url |
| |
| __author__ = 'Konstantin Ryabitsev <konstantin@linuxfoundation.org>' |
| |
| import argparse |
| import logging |
| import requests |
| import os |
| import sys |
| import subprocess |
| import hashlib |
| |
| from typing import Set, List, Optional, Tuple |
| from fcntl import lockf, LOCK_EX, LOCK_NB, LOCK_UN |
| |
| |
| __APPNAME__ = 'mlmmj-subscriber-sync' |
| __VERSION__ = '0.1' |
| |
| MLMMJ_SUB = '/usr/bin/mlmmj-sub' |
| MLMMJ_UNSUB = '/usr/bin/mlmmj-unsub' |
| logger = logging.getLogger(__APPNAME__) |
| |
| CSUMS = dict() |
| |
| |
| def _run_command(args: List[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]: |
| logger.info('Running %s' % ' '.join(args)) |
| |
| sp = subprocess.Popen(args, stdout=subprocess.PIPE, |
| stdin=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| |
| (output, error) = sp.communicate(input=stdin) |
| |
| return sp.returncode, output, error |
| |
| |
| def get_git_subscribers(gitdir: str) -> Tuple[Optional[str], Set[str]]: |
| # currently, we only support origin/master |
| curdir = os.getcwd() |
| os.chdir(gitdir) |
| args = ['git', 'pull', 'origin', 'master', '-q'] |
| ecode, out, err = _run_command(args) |
| os.chdir(curdir) |
| if ecode > 0: |
| logger.info('Was not able to pull %s', gitdir) |
| logger.info(err.decode()) |
| subsfile = os.path.join(gitdir, 'subscribers') |
| subs = set() |
| with open(subsfile, 'r') as fh: |
| while True: |
| line = fh.readline() |
| line = line.strip() |
| if not line: |
| break |
| if line.startswith('#'): |
| continue |
| subs.add(line) |
| |
| logger.info('Loaded %s remote subscribers from %s', len(subs), subsfile) |
| return None, subs |
| |
| |
| def get_remote_subscribers(mldir: str) -> Tuple[Optional[str], Set[str]]: |
| global CSUMS |
| # This overrides the presence of .external-subscribers-url |
| gitdir = os.path.join(mldir, '.external-subscribers.git') |
| if os.path.isdir(gitdir): |
| return get_git_subscribers(gitdir) |
| |
| urlfile = os.path.join(mldir, '.external-subscribers-url') |
| if not os.path.exists(urlfile): |
| raise FileNotFoundError('No .external-subscribers-url defined for %s', mldir) |
| with open(urlfile, 'r') as fh: |
| url = fh.read().strip() |
| rses = requests.session() |
| headers = {'User-Agent': f'{__APPNAME__}/{__VERSION__}'} |
| rses.headers.update(headers) |
| # Grab the checksums file from that dir first |
| rdir, lname = url.rsplit('/', maxsplit=1) |
| ckpath = rdir + '/checksum.txt' |
| if ckpath not in CSUMS: |
| CSUMS[ckpath] = dict() |
| resp = rses.get(ckpath) |
| if resp.status_code == 200: |
| # Oh, good, we have a checksum.txt |
| data = resp.text.strip() |
| for line in data.splitlines(): |
| csum, clname = line.split(maxsplit=1) |
| CSUMS[ckpath][clname] = csum |
| |
| csum = None |
| if lname in CSUMS[ckpath]: |
| csum = CSUMS[ckpath][lname] |
| lastcsfile = os.path.join(mldir, '.external-subscribers-last-csum') |
| try: |
| with open(lastcsfile, 'r') as fh: |
| lastcsum = fh.read().strip() |
| if lastcsum == csum: |
| logger.debug('Remote checksum for %s is the same', lname) |
| raise FileExistsError |
| except FileNotFoundError: |
| pass |
| |
| resp = rses.get(url) |
| if resp.status_code != 200: |
| logger.info('Unable to retrieve %s: %s', url, resp.text) |
| raise FileNotFoundError |
| resp.raise_for_status() |
| bdata = resp.content |
| if csum: |
| # Hardcode checksum to sha256 for now |
| mysum = hashlib.sha256(bdata).hexdigest() |
| if mysum != csum: |
| logger.debug('Checksum for %s did NOT match, ignoring', url) |
| raise FileNotFoundError |
| |
| data = bdata.strip().decode() |
| subs = set(data.splitlines()) |
| logger.info('Loaded %s remote subscribers from %s', len(subs), url) |
| return csum, subs |
| |
| |
| def get_last_subscribers(mldir: str) -> Set[str]: |
| lastfile = os.path.join(mldir, '.external-subscribers-last') |
| if not os.path.exists(lastfile): |
| logger.info('No lastfile in %s', mldir) |
| return set() |
| |
| with open(lastfile, 'r') as fh: |
| subs = set(fh.read().strip().splitlines()) |
| |
| logger.info('Loaded %s last subscribers from %s', len(subs), lastfile) |
| return subs |
| |
| |
| def store_last(subs: Set[str], csum: str, mldir: str): |
| lastfile = os.path.join(mldir, '.external-subscribers-last') |
| logger.info('Storing %s with %s entries', lastfile, len(subs)) |
| with open(lastfile, 'w') as fh: |
| fh.write('\n'.join(sorted(list(subs))) + '\n') |
| if not csum: |
| return |
| lastcsfile = os.path.join(mldir, '.external-subscribers-last-csum') |
| logger.info('Storing %s with checksum=%s', lastcsfile, csum) |
| with open(lastcsfile, 'w') as fh: |
| fh.write(csum + '\n') |
| |
| |
| def mlmmj_subunsub(remote: Set[str], local: Set[str], mldir: str) -> None: |
| # Make a local log |
| ll = logging.getLogger(mldir) |
| ll.setLevel(logging.DEBUG) |
| lch = logging.FileHandler(os.path.join(mldir, '.external-subscribers.log')) |
| lfmt = logging.Formatter('[%(asctime)s] %(message)s') |
| lch.setFormatter(lfmt) |
| lch.setLevel(logging.INFO) |
| ll.addHandler(lch) |
| for addr in remote.difference(local): |
| logger.info('Subscribing %s', addr) |
| args = [MLMMJ_SUB, '-L', mldir, '-f', '-q', '-s', '-a', addr] |
| ecode, out, err = _run_command(args) |
| if ecode > 0: |
| logger.critical('Error: %s, %s', out.decode(), err.decode()) |
| raise RuntimeError('Unable to run mlmmj_sub') |
| ll.info('subscribed %s', addr) |
| for addr in local.difference(remote): |
| logger.info('Unsubscribing %s', addr) |
| args = [MLMMJ_UNSUB, '-L', mldir, '-q', '-s', '-a', addr] |
| ecode, out, err = _run_command(args) |
| if ecode > 0: |
| logger.critical('Error: %s, %s', out.decode(), err.decode()) |
| raise RuntimeError('Unable to run mlmmj_unsub') |
| ll.info('unsubscribed %s', addr) |
| |
| |
| def subscriber_sync(cmdargs: argparse.Namespace) -> None: |
| # List the spool dir |
| for entry in os.listdir(cmdargs.mlmmj_spool_dir): |
| mldir = os.path.join(cmdargs.mlmmj_spool_dir, entry) |
| try: |
| csum, remote = get_remote_subscribers(mldir) |
| except (FileNotFoundError, FileExistsError, TimeoutError, OSError): |
| continue |
| ml = entry |
| logger.info('Processing %s', ml) |
| # if remote is empty, this is sus -- something probably went wrong |
| if not len(remote): |
| logger.info('Remote is empty, this is sus') |
| continue |
| # Lock it, so there are no clashes |
| lf = os.path.join(mldir, '.mlmmj-subscriber-sync.lock') |
| lfh = open(lf, 'w') |
| try: |
| lockf(lfh, LOCK_EX | LOCK_NB) |
| except IOError: |
| logger.info('Unable to lock %s, assuming it is busy', ml) |
| continue |
| local = get_last_subscribers(mldir) |
| if local == remote: |
| logger.info('No change for %s', ml) |
| continue |
| try: |
| mlmmj_subunsub(remote, local, mldir) |
| store_last(remote, csum, mldir) |
| except RuntimeError: |
| logger.critical('Unable to run mlmmj commands, exiting in panic') |
| sys.exit(1) |
| lockf(lfh, LOCK_UN) |
| |
| |
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser() |
| parser.add_argument('-q', '--quiet', action='store_true', |
| default=False, help='Print critical output only') |
| parser.add_argument('--mlmmj-spool-dir', |
| default='/var/spool/mlmmj', |
| help='Where mlmmj lists are, if not in /var/spool/mlmmj') |
| parser.add_argument('--sleep-upper', type=int, default=60, |
| help='Upper range for sleep, use 0 to disable') |
| _args = parser.parse_args() |
| |
| logger.setLevel(logging.DEBUG) |
| ch = logging.StreamHandler() |
| formatter = logging.Formatter('%(message)s') |
| ch.setFormatter(formatter) |
| if _args.quiet: |
| ch.setLevel(logging.CRITICAL) |
| else: |
| ch.setLevel(logging.INFO) |
| logger.addHandler(ch) |
| if _args.sleep_upper: |
| import random |
| import time |
| sn = random.randrange(10, _args.sleep_upper, 5) |
| logger.info('Sleeping %s seconds', sn) |
| time.sleep(sn) |
| |
| subscriber_sync(_args) |