| #!/usr/bin/python3 |
| |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # Copyright (C) 2018-2024 Oracle. All rights reserved. |
| # |
| # Author: Darrick J. Wong <djwong@kernel.org> |
| |
| # Run online scrubbers in parallel, but avoid thrashing. |
| |
| import subprocess |
| import json |
| import threading |
| import time |
| import sys |
| import os |
| import argparse |
| import signal |
| from io import TextIOWrapper |
| |
| retcode = 0 |
| terminate = False |
| |
| def DEVNULL(): |
| '''Return /dev/null in subprocess writable format.''' |
| try: |
| from subprocess import DEVNULL |
| return DEVNULL |
| except ImportError: |
| return open(os.devnull, 'wb') |
| |
| def find_mounts(): |
| '''Map mountpoints to physical disks.''' |
| def find_xfs_mounts(bdev, fs, lastdisk): |
| '''Attach lastdisk to each fs found under bdev.''' |
| if bdev['fstype'] == 'xfs' and bdev['mountpoint'] is not None: |
| mnt = bdev['mountpoint'] |
| if mnt in fs: |
| fs[mnt].add(lastdisk) |
| else: |
| fs[mnt] = set([lastdisk]) |
| if 'children' not in bdev: |
| return |
| for child in bdev['children']: |
| find_xfs_mounts(child, fs, lastdisk) |
| |
| fs = {} |
| cmd=['lsblk', '-o', 'NAME,KNAME,TYPE,FSTYPE,MOUNTPOINT', '-J'] |
| result = subprocess.Popen(cmd, stdout=subprocess.PIPE) |
| result.wait() |
| if result.returncode != 0: |
| return fs |
| sarray = [x.decode(sys.stdout.encoding) for x in result.stdout.readlines()] |
| output = ' '.join(sarray) |
| bdevdata = json.loads(output) |
| |
| # The lsblk output had better be in disks-then-partitions order |
| for bdev in bdevdata['blockdevices']: |
| lastdisk = bdev['kname'] |
| find_xfs_mounts(bdev, fs, lastdisk) |
| |
| return fs |
| |
| def backtick(cmd): |
| '''Generator function that yields lines of a program's stdout.''' |
| p = subprocess.Popen(cmd, stdout = subprocess.PIPE) |
| for line in TextIOWrapper(p.stdout, encoding="utf-8"): |
| yield line.strip() |
| |
| def remove_killfunc(killfuncs, fn): |
| '''Ensure fn is not in killfuncs.''' |
| try: |
| killfuncs.remove(fn) |
| except: |
| pass |
| |
| def run_killable(cmd, stdout, killfuncs): |
| '''Run a killable program. Returns program retcode or -1 if we can't |
| start it.''' |
| try: |
| proc = subprocess.Popen(cmd, stdout = stdout) |
| killfuncs.add(proc.terminate) |
| proc.wait() |
| remove_killfunc(killfuncs, proc.terminate) |
| return proc.returncode |
| except: |
| return -1 |
| |
| # systemd doesn't like unit instance names with slashes in them, so it |
| # replaces them with dashes when it invokes the service. Filesystem paths |
| # need a special --path argument so that dashes do not get mangled. |
| def path_to_serviceunit(path): |
| '''Convert a pathname into a systemd service unit name.''' |
| |
| cmd = ['systemd-escape', '--template', '@scrub_svcname@', |
| '--path', path] |
| try: |
| proc = subprocess.Popen(cmd, stdout = subprocess.PIPE) |
| proc.wait() |
| for line in proc.stdout: |
| return line.decode(sys.stdout.encoding).strip() |
| except: |
| return None |
| |
| def systemctl_stop(unitname): |
| '''Stop a systemd unit.''' |
| cmd = ['systemctl', 'stop', unitname] |
| x = subprocess.Popen(cmd) |
| x.wait() |
| |
| def systemctl_start(unitname, killfuncs): |
| '''Start a systemd unit and wait for it to complete.''' |
| stop_fn = None |
| cmd = ['systemctl', 'start', unitname] |
| try: |
| proc = subprocess.Popen(cmd, stdout = DEVNULL()) |
| stop_fn = lambda: systemctl_stop(unitname) |
| killfuncs.add(stop_fn) |
| proc.wait() |
| ret = proc.returncode |
| except: |
| if stop_fn is not None: |
| remove_killfunc(killfuncs, stop_fn) |
| return -1 |
| |
| if ret != 1: |
| remove_killfunc(killfuncs, stop_fn) |
| return ret |
| |
| # If systemctl-start returns 1, it's possible that the service failed |
| # or that dbus/systemd restarted and the client program lost its |
| # connection -- according to the systemctl man page, 1 means "unit not |
| # failed". |
| # |
| # Either way, we switch to polling the service status to try to wait |
| # for the service to end. As of systemd 249, the is-active command |
| # returns any of the following states: active, reloading, inactive, |
| # failed, activating, deactivating, or maintenance. Apparently these |
| # strings are not localized. |
| while True: |
| try: |
| for l in backtick(['systemctl', 'is-active', unitname]): |
| if l == 'failed': |
| remove_killfunc(killfuncs, stop_fn) |
| return 1 |
| if l == 'inactive': |
| remove_killfunc(killfuncs, stop_fn) |
| return 0 |
| except: |
| remove_killfunc(killfuncs, stop_fn) |
| return -1 |
| |
| time.sleep(1) |
| |
| def run_scrub(mnt, cond, running_devs, mntdevs, killfuncs): |
| '''Run a scrub process.''' |
| global retcode, terminate |
| |
| print("Scrubbing %s..." % mnt) |
| sys.stdout.flush() |
| |
| try: |
| if terminate: |
| return |
| |
| # Try it the systemd way |
| unitname = path_to_serviceunit(path) |
| if unitname is not None: |
| ret = systemctl_start(unitname, killfuncs) |
| if ret == 0 or ret == 1: |
| print("Scrubbing %s done, (err=%d)" % (mnt, ret)) |
| sys.stdout.flush() |
| retcode |= ret |
| return |
| |
| if terminate: |
| return |
| |
| # Invoke xfs_scrub manually |
| cmd = ['@sbindir@/xfs_scrub'] |
| cmd += '@scrub_args@'.split() |
| cmd += [mnt] |
| ret = run_killable(cmd, None, killfuncs) |
| if ret >= 0: |
| print("Scrubbing %s done, (err=%d)" % (mnt, ret)) |
| sys.stdout.flush() |
| retcode |= ret |
| return |
| |
| if terminate: |
| return |
| |
| print("Unable to start scrub tool.") |
| sys.stdout.flush() |
| finally: |
| running_devs -= mntdevs |
| cond.acquire() |
| cond.notify() |
| cond.release() |
| |
| def signal_scrubs(signum, cond): |
| '''Handle termination signals by killing xfs_scrub children.''' |
| global debug, terminate |
| |
| if debug: |
| print('Signal handler called with signal', signum) |
| sys.stdout.flush() |
| |
| terminate = True |
| cond.acquire() |
| cond.notify() |
| cond.release() |
| |
| def wait_for_termination(cond, killfuncs): |
| '''Wait for a child thread to terminate. Returns True if we should |
| abort the program, False otherwise.''' |
| global debug, terminate |
| |
| if debug: |
| print('waiting for threads to terminate') |
| sys.stdout.flush() |
| |
| cond.acquire() |
| try: |
| cond.wait() |
| except KeyboardInterrupt: |
| terminate = True |
| cond.release() |
| |
| if not terminate: |
| return False |
| |
| print("Terminating...") |
| sys.stdout.flush() |
| while len(killfuncs) > 0: |
| fn = killfuncs.pop() |
| fn() |
| return True |
| |
| def main(): |
| '''Find mounts, schedule scrub runs.''' |
| def thr(mnt, devs): |
| a = (mnt, cond, running_devs, devs, killfuncs) |
| thr = threading.Thread(target = run_scrub, args = a) |
| thr.start() |
| global retcode, terminate |
| |
| parser = argparse.ArgumentParser( \ |
| description = "Scrub all mounted XFS filesystems.") |
| parser.add_argument("-V", help = "Report version and exit.", \ |
| action = "store_true") |
| args = parser.parse_args() |
| |
| if args.V: |
| print("xfs_scrub_all version @pkg_version@") |
| sys.exit(0) |
| |
| fs = find_mounts() |
| |
| # Tail the journal if we ourselves aren't a service... |
| journalthread = None |
| if 'SERVICE_MODE' not in os.environ: |
| try: |
| cmd=['journalctl', '--no-pager', '-q', '-S', 'now', \ |
| '-f', '-u', 'xfs_scrub@*', '-o', \ |
| 'cat'] |
| journalthread = subprocess.Popen(cmd) |
| except: |
| pass |
| |
| # Schedule scrub jobs... |
| running_devs = set() |
| killfuncs = set() |
| cond = threading.Condition() |
| |
| signal.signal(signal.SIGINT, lambda s, f: signal_scrubs(s, cond)) |
| signal.signal(signal.SIGTERM, lambda s, f: signal_scrubs(s, cond)) |
| |
| while len(fs) > 0: |
| if len(running_devs) == 0: |
| mnt, devs = fs.popitem() |
| running_devs.update(devs) |
| thr(mnt, devs) |
| poppers = set() |
| for mnt in fs: |
| devs = fs[mnt] |
| can_run = True |
| for dev in devs: |
| if dev in running_devs: |
| can_run = False |
| break |
| if can_run: |
| running_devs.update(devs) |
| poppers.add(mnt) |
| thr(mnt, devs) |
| for p in poppers: |
| fs.pop(p) |
| |
| # Wait for one thread to finish |
| if wait_for_termination(cond, killfuncs): |
| break |
| |
| # Wait for the rest of the threads to finish |
| while len(killfuncs) > 0: |
| wait_for_termination(cond, killfuncs) |
| |
| if journalthread is not None: |
| journalthread.terminate() |
| |
| # See the service mode comments in xfs_scrub.c for why we do this. |
| if 'SERVICE_MODE' in os.environ: |
| time.sleep(2) |
| if retcode != 0: |
| retcode = 1 |
| |
| sys.exit(retcode) |
| |
| if __name__ == '__main__': |
| main() |