blob: 5b76b49adab84d9828682677288ae61fd58c0858 [file] [log] [blame]
#!/usr/bin/python3
# SPDX-License-Identifier: GPL-2.0+
# Copyright (C) 2018 Oracle. All rights reserved.
#
# Author: Darrick J. Wong <darrick.wong@oracle.com>
# Run online scrubbers in parallel, but avoid thrashing.
import subprocess
import json
import threading
import time
import sys
import os
import argparse
retcode = 0
terminate = False
def DEVNULL():
'''Return /dev/null in subprocess writable format.'''
try:
from subprocess import DEVNULL
return DEVNULL
except ImportError:
return open(os.devnull, 'wb')
def find_mounts():
'''Map mountpoints to physical disks.'''
def find_xfs_mounts(bdev, fs, lastdisk):
'''Attach lastdisk to each fs found under bdev.'''
if bdev['fstype'] == 'xfs' and bdev['mountpoint'] is not None:
mnt = bdev['mountpoint']
if mnt in fs:
fs[mnt].add(lastdisk)
else:
fs[mnt] = set([lastdisk])
if 'children' not in bdev:
return
for child in bdev['children']:
find_xfs_mounts(child, fs, lastdisk)
fs = {}
cmd=['lsblk', '-o', 'NAME,KNAME,TYPE,FSTYPE,MOUNTPOINT', '-J']
result = subprocess.Popen(cmd, stdout=subprocess.PIPE)
result.wait()
if result.returncode != 0:
return fs
sarray = [x.decode(sys.stdout.encoding) for x in result.stdout.readlines()]
output = ' '.join(sarray)
bdevdata = json.loads(output)
# The lsblk output had better be in disks-then-partitions order
for bdev in bdevdata['blockdevices']:
lastdisk = bdev['kname']
find_xfs_mounts(bdev, fs, lastdisk)
return fs
def kill_systemd(unit, proc):
'''Kill systemd unit.'''
proc.terminate()
cmd=['systemctl', 'stop', unit]
x = subprocess.Popen(cmd)
x.wait()
def run_killable(cmd, stdout, killfuncs, kill_fn):
'''Run a killable program. Returns program retcode or -1 if we can't start it.'''
try:
proc = subprocess.Popen(cmd, stdout = stdout)
real_kill_fn = lambda: kill_fn(proc)
killfuncs.add(real_kill_fn)
proc.wait()
try:
killfuncs.remove(real_kill_fn)
except:
pass
return proc.returncode
except:
return -1
# systemd doesn't like unit instance names with slashes in them, so it
# replaces them with dashes when it invokes the service. However, it's not
# smart enough to convert the dashes to something else, so when it unescapes
# the instance name to feed to xfs_scrub, it turns all dashes into slashes.
# "/moo-cow" becomes "-moo-cow" becomes "/moo/cow", which is wrong. systemd
# actually /can/ escape the dashes correctly if it is told that this is a path
# (and not a unit name), but it didn't do this prior to January 2017, so fix
# this for them.
#
# systemd path escaping also drops the initial slash so we add that back in so
# that log messages from the service units preserve the full path and users can
# look up log messages using full paths. However, for "/" the escaping rules
# do /not/ drop the initial slash, so we have to special-case that here.
def systemd_escape(path):
'''Escape a path to avoid mangled systemd mangling.'''
if path == '/':
return '-'
cmd = ['systemd-escape', '--path', path]
try:
proc = subprocess.Popen(cmd, stdout = subprocess.PIPE)
proc.wait()
for line in proc.stdout:
return '-' + line.decode(sys.stdout.encoding).strip()
except:
return path
def run_scrub(mnt, cond, running_devs, mntdevs, killfuncs):
'''Run a scrub process.'''
global retcode, terminate
print("Scrubbing %s..." % mnt)
sys.stdout.flush()
try:
if terminate:
return
# Try it the systemd way
cmd=['systemctl', 'start', 'xfs_scrub@%s' % systemd_escape(mnt)]
ret = run_killable(cmd, DEVNULL(), killfuncs, \
lambda proc: kill_systemd('xfs_scrub@%s' % mnt, proc))
if ret == 0 or ret == 1:
print("Scrubbing %s done, (err=%d)" % (mnt, ret))
sys.stdout.flush()
retcode |= ret
return
if terminate:
return
# Invoke xfs_scrub manually
cmd=['@sbindir@/xfs_scrub', '@scrub_args@', mnt]
ret = run_killable(cmd, None, killfuncs, \
lambda proc: proc.terminate())
if ret >= 0:
print("Scrubbing %s done, (err=%d)" % (mnt, ret))
sys.stdout.flush()
retcode |= ret
return
if terminate:
return
print("Unable to start scrub tool.")
sys.stdout.flush()
finally:
running_devs -= mntdevs
cond.acquire()
cond.notify()
cond.release()
def main():
'''Find mounts, schedule scrub runs.'''
def thr(mnt, devs):
a = (mnt, cond, running_devs, devs, killfuncs)
thr = threading.Thread(target = run_scrub, args = a)
thr.start()
global retcode, terminate
parser = argparse.ArgumentParser( \
description = "Scrub all mounted XFS filesystems.")
parser.add_argument("-V", help = "Report version and exit.", \
action = "store_true")
args = parser.parse_args()
if args.V:
print("xfs_scrub_all version @pkg_version@")
sys.exit(0)
fs = find_mounts()
# Tail the journal if we ourselves aren't a service...
journalthread = None
if 'SERVICE_MODE' not in os.environ:
try:
cmd=['journalctl', '--no-pager', '-q', '-S', 'now', \
'-f', '-u', 'xfs_scrub@*', '-o', \
'cat']
journalthread = subprocess.Popen(cmd)
except:
pass
# Schedule scrub jobs...
running_devs = set()
killfuncs = set()
cond = threading.Condition()
while len(fs) > 0:
if len(running_devs) == 0:
mnt, devs = fs.popitem()
running_devs.update(devs)
thr(mnt, devs)
poppers = set()
for mnt in fs:
devs = fs[mnt]
can_run = True
for dev in devs:
if dev in running_devs:
can_run = False
break
if can_run:
running_devs.update(devs)
poppers.add(mnt)
thr(mnt, devs)
for p in poppers:
fs.pop(p)
cond.acquire()
try:
cond.wait()
except KeyboardInterrupt:
terminate = True
print("Terminating...")
sys.stdout.flush()
while len(killfuncs) > 0:
fn = killfuncs.pop()
fn()
fs = []
cond.release()
if journalthread is not None:
journalthread.terminate()
# See the service mode comments in xfs_scrub.c for why we do this.
if 'SERVICE_MODE' in os.environ:
time.sleep(2)
if retcode != 0:
retcode = 1
sys.exit(retcode)
if __name__ == '__main__':
main()