blob: 7abc45a508bf44e8b4efd4d793d38f9ae80246b6 [file] [log] [blame]
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright 2016 - Clark Williams <williams@redhat.com>
# Copyright 2021 - John Kacur <jkacur@redhat.com>
# Copyright 2023 - Tomas Glozar <tglozar@redhat.com>
#
"""Module providing utility functions for working with CPU lists"""
import os
cpupath = "/sys/devices/system/cpu"
def sysread(path, obj):
""" Helper function for reading system files """
with open(os.path.join(path, obj), "r") as fp:
return fp.readline().strip()
def _online_file_exists():
""" Check whether machine / kernel is configured with online file """
# Note: some machines do not have cpu0/online so we check cpu1/online.
# In the case of machines with a single CPU, there is no cpu1, but
# that is not a problem, since a single CPU cannot be offline
return os.path.exists(os.path.join(cpupath, "cpu1/online"))
def _isolated_file_exists():
""" Check whether machine / kernel is configured with isolated file """
return os.path.exists(os.path.join(cpupath, "isolated"))
def collapse_cpulist(cpulist):
"""
Collapse a list of cpu numbers into a string range
of cpus (e.g. 0-5, 7, 9)
"""
cur_range = [None, None]
result = []
for cpu in cpulist + [None]:
if cur_range[0] is None:
cur_range[0] = cur_range[1] = cpu
continue
if cpu is not None and cpu == cur_range[1] + 1:
# Extend currently processed range
cur_range[1] += 1
else:
# Range processing finished, add range to string
result.append(f"{cur_range[0]}-{cur_range[1]}"
if cur_range[0] != cur_range[1]
else str(cur_range[0]))
# Reset
cur_range[0] = cur_range[1] = cpu
return ",".join(result)
def compress_cpulist(cpulist):
""" return a string representation of cpulist """
if not cpulist:
return ""
if isinstance(cpulist[0], int):
return ",".join(str(e) for e in cpulist)
return ",".join(cpulist)
def expand_cpulist(cpulist):
""" expand a range string into an array of cpu numbers
don't error check against online cpus
"""
result = []
if not cpulist:
return result
for part in cpulist.split(','):
if '-' in part:
a, b = part.split('-')
a, b = int(a), int(b)
result.extend(list(range(a, b + 1)))
else:
a = int(part)
result.append(a)
return [int(i) for i in list(set(result))]
def is_online(n):
""" check whether cpu n is online """
path = os.path.join(cpupath, f'cpu{n}')
# Some hardware doesn't allow cpu0 to be turned off
if not os.path.exists(path + '/online') and n == 0:
return True
return sysread(path, "online") == "1"
def online_cpulist(cpulist):
""" Given a cpulist, return a cpulist of online cpus """
# This only works if the sys online files exist
if not _online_file_exists():
return cpulist
newlist = []
for cpu in cpulist:
if not _online_file_exists() and cpu == '0':
newlist.append(cpu)
elif is_online(int(cpu)):
newlist.append(cpu)
return newlist
def isolated_cpulist(cpulist):
"""Given a cpulist, return a cpulist of isolated CPUs"""
if not _isolated_file_exists():
return cpulist
isolated_cpulist = sysread(cpupath, "isolated")
isolated_cpulist = expand_cpulist(isolated_cpulist)
return list(set(isolated_cpulist) & set(cpulist))
def nonisolated_cpulist(cpulist):
"""Given a cpulist, return a cpulist of non-isolated CPUs"""
if not _isolated_file_exists():
return cpulist
isolated_cpulist = sysread(cpupath, "isolated")
isolated_cpulist = expand_cpulist(isolated_cpulist)
return list(set(cpulist).difference(set(isolated_cpulist)))
def is_relative(cpulist):
return cpulist.startswith("+") or cpulist.startswith("-")
def expand_relative_cpulist(cpulist):
"""
Expand a relative cpulist into a tuple of lists.
:param cpulist: Relative cpulist of form +1,2,3,-4,5,6
:return: Tuple of two lists, one for added CPUs, one for removed CPUs
"""
added_cpus = []
removed_cpus = []
if not cpulist:
return added_cpus, removed_cpus
cpus = None
for part in cpulist.split(','):
if part.startswith('+') or part.startswith('-'):
cpus = added_cpus if part[0] == '+' else removed_cpus
part = part[1:]
if '-' in part:
a, b = part.split('-')
a, b = int(a), int(b)
cpus.extend(list(range(a, b + 1)))
else:
a = int(part)
cpus.append(a)
return list(set(added_cpus)), list(set(removed_cpus))