blob: 4330a839db6f00098ac73708e0be90877a53e2f5 [file] [log] [blame]
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright 2012 - 2013 David Sommerseth <davids@redhat.com>
#
import time
from datetime import datetime
import threading
import argparse
import libxml2
from rteval.Log import Log
from rteval.rtevalConfig import rtevalCfgSection
__all__ = ["rtevalRuntimeError", "rtevalModulePrototype", "ModuleContainer", "RtEvalModules"]
class rtevalRuntimeError(RuntimeError):
def __init__(self, mod, message):
RuntimeError.__init__(self, message)
# The module had a RuntimeError, we set the flag
mod._setRuntimeError()
class rtevalModulePrototype(threading.Thread):
""" Prototype rteval modules class - to be inherited by the real module """
def __init__(self, modtype, name, logger=None):
if logger and not isinstance(logger, Log):
raise TypeError("logger attribute is not a Log() object")
threading.Thread.__init__(self)
self._module_type = modtype
self._name = name
self.__logger = logger
self.__ready = False
self.__runtimeError = False
self.__events = {"start": threading.Event(),
"stop": threading.Event(),
"finished": threading.Event()}
self._donotrun = False
self._exclusive = False
self.__timestamps = {}
self.__sleeptime = 2.0
def _log(self, logtype, msg):
""" Common log function for rteval modules """
if self.__logger:
self.__logger.log(logtype, f"[{self._name}] {msg}")
def isReady(self):
""" Returns a boolean if the module is ready to run """
if self._donotrun:
return True
return self.__ready
def is_exclusive(self):
""" Returns true if this workload should run alone """
return self._exclusive
def set_exclusive(self):
""" Sets This module to run alone """
self._exclusive = True
def set_donotrun(self):
""" set a module's donotrun field to True """
self._donotrun = True
def _setReady(self, state=True):
""" Sets the ready flag for the module """
self.__ready = state
def hadRuntimeError(self):
""" Returns a boolean if the module had a RuntimeError """
return self.__runtimeError
def _setRuntimeError(self, state=True):
""" Sets the runtimeError flag for the module """
self.__runtimeError = state
def setStart(self):
""" Sets the start event state """
self.__events["start"].set()
self.__timestamps["start_set"] = datetime.now()
def shouldStart(self):
"""Returns the start event state - indicating the module can start """
return self.__events["start"].isSet()
def setStop(self):
""" Sets the stop event state """
self.__events["stop"].set()
self.__timestamps["stop_set"] = datetime.now()
def shouldStop(self):
""" Returns the stop event state - indicating the module should stop """
return self.__events["stop"].isSet()
def _setFinished(self):
""" Sets the finished event state - indicating the module has completed
"""
self.__events["finished"].set()
self.__timestamps["finished_set"] = datetime.now()
def WaitForCompletion(self, wtime=None):
""" Blocks until the module has completed its workload """
if not self.shouldStart():
# If it hasn't been started yet, nothing to wait for
return None
return self.__events["finished"].wait(wtime)
def _WorkloadSetup(self):
""" Required module method, which purpose is to do the initial workload
setup, preparing for _WorkloadBuild()
"""
raise NotImplementedError(f"_WorkloadSetup() method must be implemented in the {self._name} module")
def _WorkloadBuild(self):
""" Required module method, which purpose is to compile additional code
needed for the worklaod
"""
raise NotImplementedError(f"_WorkloadBuild() method must be implemented in the {self._name} module")
def _WorkloadPrepare(self):
""" Required module method, which will initialise and prepare the
workload just before it is about to start
"""
raise NotImplementedError(f"_WorkloadPrepare() method must be implemented in the {self._name} module")
def _WorkloadTask(self):
""" Required module method, which kicks off the workload """
raise NotImplementedError(f"_WorkloadTask() method must be implemented in the {self._name} module")
def WorkloadAlive(self):
""" Required module method, which should return True if the workload is
still alive
"""
raise NotImplementedError(f"WorkloadAlive() method must be implemented in the {self._name} module")
def _WorkloadCleanup(self):
""" Required module method, which will be run after the _WorkloadTask()
has completed or been aborted by the 'stop event flag'
"""
raise NotImplementedError(f"_WorkloadCleanup() method must be implemented in the {self._name} module")
def WorkloadWillRun(self):
"Returns True if this workload will be run"
return self._donotrun is False
def run(self):
"Workload thread runner - takes care of keeping the workload running as long as needed"
if self.shouldStop():
return
# Initial workload setups
self._WorkloadSetup()
if not self._donotrun:
# Compile the workload
self._WorkloadBuild()
# Do final preparations of workload before we're ready to start running
self._WorkloadPrepare()
# Wait until we're released
while True:
if self.shouldStop():
return
self.__events["start"].wait(1.0)
if self.shouldStart():
break
self._log(Log.DEBUG, f"Starting {self._module_type} workload")
self.__timestamps["runloop_start"] = datetime.now()
while not self.shouldStop():
# Run the workload
self._WorkloadTask()
if self.shouldStop():
break
time.sleep(self.__sleeptime)
self.__timestamps["runloop_stop"] = datetime.now()
self._log(Log.DEBUG, f"stopping {self._module_type} workload")
else:
self._log(Log.DEBUG, "Workload was not started")
self._WorkloadCleanup()
def MakeReport(self):
""" required module method, needs to return an libxml2.xmlNode object
with the the results from running
"""
raise NotImplementedError(f"MakeReport() method must be implemented in the {self._name} module")
def GetTimestamps(self):
"Return libxml2.xmlNode object with the gathered timestamps"
ts_n = libxml2.newNode("timestamps")
for k in list(self.__timestamps.keys()):
ts_n.newChild(None, k, str(self.__timestamps[k]))
return ts_n
class ModuleContainer:
"""The ModuleContainer keeps an overview over loaded modules and the objects it
will instantiate. These objects are accessed by iterating the ModuleContainer object."""
def __init__(self, modules_root, logger):
"""Creates a ModuleContainer object. modules_root defines the default
directory where the modules will be loaded from. logger should point to a Log()
object which will be used for logging and will also be given to the instantiated
objects during module import."""
if logger and not isinstance(logger, Log):
raise TypeError("logger attribute is not a Log() object")
self.__modules_root = modules_root
self.__modtype = modules_root.split('.')[-1]
self.__logger = logger
self.__modobjects = {} # Keeps track of instantiated objects
self.__modsloaded = {} # Keeps track of imported modules
self.__iter_list = None
def LoadModule(self, modname, modroot=None):
"""Imports a module and saves references to the imported module.
If the same module is tried imported more times, it will return the module
reference from the first import"""
if modroot is None:
modroot = self.__modules_root
# If this module is already reported return the module,
# if not (except KeyError:) import it and return the imported module
try:
idxname = f"{modroot}.{modname}"
return self.__modsloaded[idxname]
except KeyError:
self.__logger.log(Log.INFO, f"importing module {modname}")
mod = __import__(f"rteval.{modroot}.{modname}",
fromlist=f"rteval.{modroot}")
self.__modsloaded[idxname] = mod
return mod
def ModuleInfo(self, modname, modroot=None):
"""Imports a module and calls the modules' ModuleInfo() function and returns
the information provided by the module"""
mod = self.LoadModule(modname, modroot)
return mod.ModuleInfo()
def SetupModuleOptions(self, parser, config):
"""Sets up a separate argparse ArgumentGroup per module with its supported parameters"""
grparser = parser.add_argument_group(f"Group Options for {self.__modtype} modules")
grparser.add_argument(f'--{self.__modtype}-cpulist',
dest=f'{self.__modtype}___cpulist', action='store', default="",
help=f'CPU list where {self.__modtype} modules will run',
metavar='LIST')
for (modname, mod) in list(self.__modsloaded.items()):
opts = mod.ModuleParameters()
if len(opts) == 0:
continue
shortmod = modname.split('.')[-1]
try:
cfg = config.GetSection(shortmod)
except KeyError:
# Ignore if a section is not found
cfg = None
modgrparser = parser.add_argument_group(f"Options for the {shortmod} module")
for (o, s) in list(opts.items()):
descr = 'descr' in s and s['descr'] or ""
metavar = 'metavar' in s and s['metavar'] or None
try:
default = cfg and getattr(cfg, o) or None
except AttributeError:
# Ignore if this isn't found in the configuration object
default = None
if default is None:
default = 'default' in s and s['default'] or None
modgrparser.add_argument(f'--{shortmod}-{o}',
dest=f"{shortmod}___{o}",
action='store',
help='%s%s' % (descr,
default and ' (default: %s)' % default or ''),
default=default,
metavar=metavar)
return grparser
def InstantiateModule(self, modname, modcfg, modroot=None):
"""Imports a module and instantiates an object from the modules create() function.
The instantiated object is returned in this call"""
if modcfg and not isinstance(modcfg, rtevalCfgSection):
raise TypeError("modcfg attribute is not a rtevalCfgSection() object")
mod = self.LoadModule(modname, modroot)
return mod.create(modcfg, self.__logger)
def RegisterModuleObject(self, modname, modobj):
"""Registers an instantiated module object. This module object will be
returned when a ModuleContainer object is iterated over"""
self.__modobjects[modname] = modobj
def ExportModule(self, modname, modroot=None):
"Export module info, used to transfer an imported module to another ModuleContainer"
if modroot is None:
modroot = self.__modules_root
mod = f"{modroot}.{modname}"
return (mod, self.__modsloaded[mod])
def ImportModule(self, module):
"Imports an exported module from another ModuleContainer"
(modname, moduleimp) = module
self.__modsloaded[modname] = moduleimp
def ModulesLoaded(self):
"Returns number of registered module objects"
return len(self.__modobjects)
def GetModulesList(self):
"Returns a list of module names"
return list(self.__modobjects.keys())
def GetNamedModuleObject(self, modname):
"Looks up a named module and returns its registered module object"
return self.__modobjects[modname]
def __iter__(self):
"Initiates the iterating process"
self.__iter_list = list(self.__modobjects.keys())
return self
def __next__(self):
""" Internal Python iterating method, returns the next
module name and object to be processed
"""
if len(self.__iter_list) == 0:
self.__iter_list = None
raise StopIteration
modname = self.__iter_list.pop()
return (modname, self.__modobjects[modname])
class RtEvalModules:
""" RtEvalModules should normally be inherrited by a more specific module
class. This class takes care of managing imported modules and has methods
for starting and stopping the workload these modules contains.
"""
def __init__(self, config, modules_root, logger):
""" Initialises the RtEvalModules() internal variables.
The modules_root argument should point at the root directory where
the modules will be loaded from. The logger argument should point
to a Log() object which will be used for logging and will also be given
to the instantiated objects during module import.
"""
self._cfg = config
self._logger = logger
self.__modules = ModuleContainer(modules_root, logger)
self.__timestamps = {}
# Export some of the internal module container methods
# Primarily to have better control of the module containers
# iteration API
def _ImportModule(self, module):
"Imports a module exported by ModuleContainer::ExportModule()"
return self.__modules.ImportModule(module)
def _InstantiateModule(self, modname, modcfg, modroot=None):
"Imports a module and returns an instantiated object from the module"
return self.__modules.InstantiateModule(modname, modcfg, modroot)
def _RegisterModuleObject(self, modname, modobj):
"Registers an instantiated module object which RtEvalModules will control"
return self.__modules.RegisterModuleObject(modname, modobj)
def _LoadModule(self, modname, modroot=None):
"Loads and imports a module"
return self.__modules.LoadModule(modname, modroot)
def ModulesLoaded(self):
"Returns number of imported modules"
return self.__modules.ModulesLoaded()
def GetModulesList(self):
"Returns a list of module names"
return self.__modules.GetModulesList()
def SetupModuleOptions(self, parser):
"Sets up argparse based argument groups for the loaded modules"
return self.__modules.SetupModuleOptions(parser, self._cfg)
def GetNamedModuleObject(self, modname):
"Returns a list of module names"
return self.__modules.GetNamedModuleObject(modname)
# End of exports
def Start(self):
""" Prepares all the imported modules workload to start, but they
will not start their workloads yet
"""
if self.__modules.ModulesLoaded() == 0:
raise rtevalRuntimeError(f"No {self._module_type} modules configured")
self._logger.log(Log.INFO, f"Preparing {self._module_type} modules")
exclusive = 0
for (modname, mod) in self.__modules:
if mod.is_exclusive() and mod.WorkloadWillRun():
exclusive += 1
for (modname, mod) in self.__modules:
if exclusive >= 1:
if exclusive != 1:
msg = f"More than one exclusive load: {exclusive}"
raise RuntimeError(msg)
if not mod.is_exclusive() and mod.WorkloadWillRun():
mod.set_donotrun()
mod.start()
if mod.WorkloadWillRun():
self._logger.log(Log.DEBUG, f"\t - Started {modname} preparations")
self._logger.log(Log.DEBUG, f"Waiting for all {self._module_type} modules to get ready")
busy = True
while busy:
busy = False
for (modname, mod) in self.__modules:
if not mod.isReady():
if not mod.hadRuntimeError():
busy = True
self._logger.log(Log.DEBUG, f"Waiting for {modname}")
else:
raise RuntimeError(f"Runtime error starting the {modname} {self._module_type} module")
if busy:
time.sleep(1)
self._logger.log(Log.DEBUG, f"All {self._module_type} modules are ready")
def hadError(self):
"Returns True if one or more modules had a RuntimeError"
return self.__runtimeError
def Unleash(self):
"""Unleashes all the loaded modules workloads"""
# turn loose the loads
nthreads = 0
self._logger.log(Log.INFO, f"Sending start event to all {self._module_type} modules")
for (modname, mod) in self.__modules:
mod.setStart()
nthreads += 1
self.__timestamps['unleash'] = datetime.now()
return nthreads
def _isAlive(self):
"""Returns True if all modules are running"""
for (modname, mod) in self.__modules:
# We requiring all modules to run to pass
if not mod.WorkloadAlive():
return False
return True
def Stop(self):
"""Stops all the running workloads from in all the loaded modules"""
if self.ModulesLoaded() == 0:
raise RuntimeError(f"No {self._module_type} modules configured")
self._logger.log(Log.INFO, f"Stopping {self._module_type} modules")
for (modname, mod) in self.__modules:
if not mod.WorkloadWillRun():
continue
mod.setStop()
try:
self._logger.log(Log.DEBUG, f"\t - Stopping {modname}")
if mod.is_alive():
mod.join(2.0)
except RuntimeError as e:
self._logger.log(Log.ERR, f"\t\tFailed stopping {modname}: {str(e)}")
self.__timestamps['stop'] = datetime.now()
def WaitForCompletion(self, wtime=None):
"""Waits for the running modules to complete their running"""
self._logger.log(Log.INFO, f"Waiting for {self._module_type} modules to complete")
for (modname, mod) in self.__modules:
self._logger.log(Log.DEBUG, f"\t - Waiting for {modname}")
mod.WaitForCompletion(wtime)
self._logger.log(Log.DEBUG, f"All {self._module_type} modules completed")
def MakeReport(self):
"""Collects all the loaded modules reports in a single libxml2.xmlNode() object"""
rep_n = libxml2.newNode(self._report_tag)
for (modname, mod) in self.__modules:
self._logger.log(Log.DEBUG, f"Getting report from {modname}")
modrep_n = mod.MakeReport()
if modrep_n is not None:
if self._module_type != 'load':
# Currently the <loads/> tag will not easily integrate
# timestamps. Not sure it makes sense to track this on
# load modules.
modrep_n.addChild(mod.GetTimestamps())
rep_n.addChild(modrep_n)
return rep_n