| # -*- coding: utf-8 -*- |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # |
| # Copyright 2009 - 2013 Clark Williams <williams@redhat.com> |
| # Copyright 2009 - 2013 David Sommerseth <davids@redhat.com> |
| # Copyright 2012 - 2013 Raphaƫl Beamonte <raphael.beamonte@gmail.com> |
| # |
| |
| import sys |
| import subprocess |
| import os |
| import libxml2 |
| from rteval.sysinfo.tools import getcmdpath |
| from rteval.Log import Log |
| |
| |
| class KernelInfo: |
| def __init__(self, logger=None): |
| self.__logger = logger |
| |
| |
| def __log(self, logtype, msg): |
| if self.__logger: |
| self.__logger.log(logtype, msg) |
| |
| |
| def kernel_get_kthreads(self): |
| policies = {'DLN': 'deadline', 'FF':'fifo', 'RR':'rrobin', 'TS':'other', '?':'unknown'} |
| ret_kthreads = {} |
| self.__log(Log.DEBUG, "getting kthread status") |
| cmd = f"{getcmdpath('ps')} -eocommand,pid,policy,rtprio,comm" |
| self.__log(Log.DEBUG, f"cmd: {cmd}") |
| c = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) |
| for p in c.stdout: |
| v = p.strip().split() |
| kcmd = v.pop(0) |
| kcmd = bytes.decode(kcmd) |
| try: |
| if int(v[0]) > 0 and kcmd.startswith('[') and kcmd.endswith(']'): |
| |
| ret_kthreads[v[0]] = {'policy' : policies[bytes.decode(v[1])], |
| 'priority' : v[2], 'name' : v[3]} |
| except ValueError: |
| pass # Ignore lines which don't have a number in the first row |
| return ret_kthreads |
| |
| |
| def kernel_get_modules(self): |
| modlist = [] |
| try: |
| fp = open('/proc/modules', 'r') |
| line = fp.readline() |
| while line: |
| mod = line.split() |
| modlist.append({"modname": mod[0], |
| "modsize": mod[1], |
| "numusers": mod[2], |
| "usedby": mod[3], |
| "modstate": mod[4]}) |
| line = fp.readline() |
| fp.close() |
| except Exception as err: |
| raise err |
| return modlist |
| |
| |
| def kernel_get_clocksources(self): |
| '''get the available and curent clocksources for this kernel''' |
| path = '/sys/devices/system/clocksource/clocksource0' |
| if not os.path.exists(path): |
| raise RuntimeError("Can't find clocksource path in /sys") |
| f = open(os.path.join(path, "current_clocksource")) |
| current_clocksource = f.readline().strip() |
| f = open(os.path.join(path, "available_clocksource")) |
| available_clocksource = f.readline().strip() |
| f.close() |
| return (current_clocksource, available_clocksource) |
| |
| |
| def MakeReport(self): |
| rep_n = libxml2.newNode("Kernel") |
| |
| clksrc = self.kernel_get_clocksources() |
| clock_n = libxml2.newNode("ClockSource") |
| rep_n.addChild(clock_n) |
| for avail in clksrc[1].split(): |
| avail_n = libxml2.newNode("source") |
| avail_n.addContent(avail) |
| if avail == clksrc[0]: |
| avail_n.newProp("current", "1") |
| clock_n.addChild(avail_n) |
| |
| mods_n = libxml2.newNode("Modules") |
| rep_n.addChild(mods_n) |
| |
| for mod in self.kernel_get_modules(): |
| mod_n = libxml2.newNode("Module") |
| mods_n.addChild(mod_n) |
| |
| mod_n.newProp("name", mod["modname"]) |
| |
| mod_n.newProp("size", str(mod["modsize"])) |
| mod_n.newProp("state", mod["modstate"]) |
| mod_n.newProp("numusers", str(mod["numusers"])) |
| |
| if mod["usedby"] != "-": |
| usedby_n = libxml2.newNode("usedby") |
| mod_n.addChild(usedby_n) |
| for ub in mod["usedby"].split(","): |
| if len(ub): |
| ub_n = libxml2.newNode("module") |
| ub_n.addContent(ub) |
| usedby_n.addChild(ub_n) |
| |
| |
| kthreads_n = libxml2.newNode("kthreads") |
| rep_n.addChild(kthreads_n) |
| |
| kthreads = self.kernel_get_kthreads() |
| keys = list(kthreads.keys()) |
| if len(keys): |
| keys.sort() |
| for pid in keys: |
| kthri_n = libxml2.newNode("thread") |
| kthreads_n.addChild(kthri_n) |
| kthri_n.addContent(bytes.decode(kthreads[pid]["name"])) |
| kthri_n.newProp("policy", kthreads[pid]["policy"]) |
| kthri_n.newProp("priority", bytes.decode(kthreads[pid]["priority"])) |
| |
| return rep_n |
| |
| |
| |
| def unit_test(rootdir): |
| try: |
| from pprint import pprint |
| log = Log() |
| log.SetLogVerbosity(Log.INFO|Log.DEBUG) |
| |
| ki = KernelInfo(logger=log) |
| pprint(ki.kernel_get_kthreads()) |
| pprint(ki.kernel_get_modules()) |
| pprint(ki.kernel_get_clocksources()) |
| |
| ki_xml = ki.MakeReport() |
| xml_d = libxml2.newDoc("1.0") |
| xml_d.setRootElement(ki_xml) |
| xml_d.saveFormatFileEnc("-", "UTF-8", 1) |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc(file=sys.stdout) |
| print("** EXCEPTION %s", str(e)) |
| return 1 |
| |
| if __name__ == '__main__': |
| unit_test(None) |