blob: de70e03087731ed48a650a0f4cd413c3200fbda0 [file] [log] [blame]
#
# Copyright 2012 - 2013 David Sommerseth <davids@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# For the avoidance of doubt the "preferred form" of this code is one which
# is in an open unpatent encumbered format. Where cryptographic key signing
# forms part of the process of creating an executable the information
# including keys needed to generate an equivalently functional executable
# are deemed to be part of the source code.
#
from rteval.Log import Log
from rteval.rtevalConfig import rtevalCfgSection
from datetime import datetime
import time, libxml2, threading, optparse
__all__ = ["rtevalRuntimeError", "rtevalModulePrototype", "ModuleContainer", "RtEvalModules"]
class rtevalRuntimeError(RuntimeError):
def __init__(self, mod, message):
RuntimeError.__init__(self, message)
# The module had a RuntimeError, we set the flag
mod._setRuntimeError()
class rtevalModulePrototype(threading.Thread):
"Prototype class for rteval modules - must be inherited by the real module"
def __init__(self, modtype, name, logger=None):
if logger and not isinstance(logger, Log):
raise TypeError("logger attribute is not a Log() object")
threading.Thread.__init__(self)
self._module_type = modtype
self._name = name
self.__logger = logger
self.__ready = False
self.__runtimeError = False
self.__events = {"start": threading.Event(),
"stop": threading.Event(),
"finished": threading.Event()}
self._donotrun = False
self.__timestamps = {}
def _log(self, logtype, msg):
"Common log function for rteval modules"
if self.__logger:
self.__logger.log(logtype, "[%s] %s" % (self._name, msg))
def isReady(self):
"Returns a boolean if the module is ready to run"
if self._donotrun:
return True
return self.__ready
def _setReady(self, state=True):
"Sets the ready flag for the module"
self.__ready = state
def hadRuntimeError(self):
"Returns a boolean if the module had a RuntimeError"
return self.__runtimeError
def _setRuntimeError(self, state=True):
"Sets the runtimeError flag for the module"
self.__runtimeError = state
def setStart(self):
"Sets the start event state"
self.__events["start"].set()
self.__timestamps["start_set"] = datetime.now()
def shouldStart(self):
"Returns the start event state - indicating the module can start"
return self.__events["start"].isSet()
def setStop(self):
"Sets the stop event state"
self.__events["stop"].set()
self.__timestamps["stop_set"] = datetime.now()
def shouldStop(self):
"Returns the stop event state - indicating the module should stop"
return self.__events["stop"].isSet()
def _setFinished(self):
"Sets the finished event state - indicating the module has completed"
self.__events["finished"].set()
self.__timestamps["finished_set"] = datetime.now()
def WaitForCompletion(self, wtime = None):
"Blocks until the module has completed its workload"
if not self.shouldStart():
# If it hasn't been started yet, nothing to wait for
return None
return self.__events["finished"].wait(wtime)
def _WorkloadSetup(self):
"Required module method, which purpose is to do the initial workload setup, preparing for _WorkloadBuild()"
raise NotImplementedError("_WorkloadSetup() method must be implemented in the %s module" % self._name)
def _WorkloadBuild(self):
"Required module method, which purpose is to compile additional code needed for the worklaod"
raise NotImplementedError("_WorkloadBuild() method must be implemented in the %s module" % self._name)
def _WorkloadPrepare(self):
"Required module method, which will initialise and prepare the workload just before it is about to start"
raise NotImplementedError("_WorkloadPrepare() method must be implemented in the %s module" % self._name)
def _WorkloadTask(self):
"Required module method, which kicks off the workload"
raise NotImplementedError("_WorkloadTask() method must be implemented in the %s module" % self._name)
def WorkloadAlive(self):
"Required module method, which should return True if the workload is still alive"
raise NotImplementedError("WorkloadAlive() method must be implemented in the %s module" % self._name)
def _WorkloadCleanup(self):
"Required module method, which will be run after the _WorkloadTask() has completed or been aborted by the 'stop event flag'"
raise NotImplementedError("_WorkloadCleanup() method must be implemented in the %s module" % self._name)
def WorkloadWillRun(self):
"Returns True if this workload will be run"
return self._donotrun is False
def run(self):
"Workload thread runner - takes care of keeping the workload running as long as needed"
if self.shouldStop():
return
# Initial workload setups
self._WorkloadSetup()
if not self._donotrun:
# Compile the workload
self._WorkloadBuild()
# Do final preparations of workload before we're ready to start running
self._WorkloadPrepare()
# Wait until we're released
while True:
if self.shouldStop():
return
self.__events["start"].wait(1.0)
if self.shouldStart():
break
self._log(Log.DEBUG, "Starting %s workload" % self._module_type)
self.__timestamps["runloop_start"] = datetime.now()
while not self.shouldStop():
# Run the workload
self._WorkloadTask()
if self.shouldStop():
break
if not self.WorkloadAlive():
self._log(Log.DEBUG, "%s workload stopped running." % self._module_type)
break
time.sleep(60.0)
self.__timestamps["runloop_stop"] = datetime.now()
self._log(Log.DEBUG, "stopping %s workload" % self._module_type)
else:
self._log(Log.DEBUG, "Workload was not started")
self._WorkloadCleanup()
def MakeReport(self):
"required module method, needs to return an libxml2.xmlNode object with the the results from running"
raise NotImplementedError("MakeReport() method must be implemented in the%s module" % self._name)
def GetTimestamps(self):
"Return libxml2.xmlNode object with the gathered timestamps"
ts_n = libxml2.newNode("timestamps")
for k in self.__timestamps.keys():
ts_n.newChild(None, k, str(self.__timestamps[k]))
return ts_n
class ModuleContainer(object):
"""The ModuleContainer keeps an overview over loaded modules and the objects it
will instantiate. These objects are accessed by iterating the ModuleContainer object."""
def __init__(self, modules_root, logger):
"""Creates a ModuleContainer object. modules_root defines the default
directory where the modules will be loaded from. logger should point to a Log()
object which will be used for logging and will also be given to the instantiated
objects during module import."""
if logger and not isinstance(logger, Log):
raise TypeError("logger attribute is not a Log() object")
self.__modules_root = modules_root
self.__logger = logger
self.__modobjects = {} # Keeps track of instantiated objects
self.__modsloaded = {} # Keeps track of imported modules
self.__iter_list = None
def LoadModule(self, modname, modroot=None):
"""Imports a module and saves references to the imported module.
If the same module is tried imported more times, it will return the module
reference from the first import"""
if modroot is None:
modroot = self.__modules_root
# If this module is already reported return the module,
# if not (except KeyError:) import it and return the imported module
try:
idxname = "%s.%s" % (modroot, modname)
return self.__modsloaded[idxname]
except KeyError:
self.__logger.log(Log.INFO, "importing module %s" % modname)
mod = __import__("rteval.%s.%s" % (modroot, modname),
fromlist="rteval.%s" % modroot)
self.__modsloaded[idxname] = mod
return mod
def ModuleInfo(self, modname, modroot = None):
"""Imports a module and calls the modules' ModuleInfo() function and returns
the information provided by the module"""
mod = self.LoadModule(modname, modroot)
return mod.ModuleInfo()
def SetupModuleOptions(self, parser, config):
"""Sets up a separate optptarse OptionGroup per module with its supported parameters"""
for (modname, mod) in self.__modsloaded.items():
opts = mod.ModuleParameters()
if len(opts) == 0:
continue
shortmod = modname.split('.')[-1]
try:
cfg = config.GetSection(shortmod)
except KeyError:
# Ignore if a section is not found
cfg = None
grparser = optparse.OptionGroup(parser, "Options for the %s module" % shortmod)
for (o, s) in opts.items():
descr = s.has_key('descr') and s['descr'] or ""
metavar = s.has_key('metavar') and s['metavar'] or None
try:
default = cfg and getattr(cfg, o) or None
except AttributeError:
# Ignore if this isn't found in the configuration object
default = None
if default is None:
default = s.has_key('default') and s['default'] or None
grparser.add_option('--%s-%s' % (shortmod, o),
dest="%s___%s" % (shortmod, o),
action='store',
help='%s%s' % (descr,
default and ' (default: %s)' % default or ''),
default=default,
metavar=metavar)
parser.add_option_group(grparser)
def InstantiateModule(self, modname, modcfg, modroot = None):
"""Imports a module and instantiates an object from the modules create() function.
The instantiated object is returned in this call"""
if modcfg and not isinstance(modcfg, rtevalCfgSection):
raise TypeError("modcfg attribute is not a rtevalCfgSection() object")
mod = self.LoadModule(modname, modroot)
return mod.create(modcfg, self.__logger)
def RegisterModuleObject(self, modname, modobj):
"""Registers an instantiated module object. This module object will be
returned when a ModuleContainer object is iterated over"""
self.__modobjects[modname] = modobj
def ExportModule(self, modname, modroot = None):
"Export module info, used to transfer an imported module to another ModuleContainer"
if modroot is None:
modroot = self.__modules_root
mod = "%s.%s" % (modroot, modname)
return (mod, self.__modsloaded[mod])
def ImportModule(self, module):
"Imports an exported module from another ModuleContainer"
(modname, moduleimp) = module
self.__modsloaded[modname] = moduleimp
def ModulesLoaded(self):
"Returns number of registered module objects"
return len(self.__modobjects)
def GetModulesList(self):
"Returns a list of module names"
return self.__modobjects.keys()
def GetNamedModuleObject(self, modname):
"Looks up a named module and returns its registered module object"
return self.__modobjects[modname]
def __iter__(self):
"Initiates the iterating process"
self.__iter_list = self.__modobjects.keys()
return self
def next(self):
"""Internal Python iterating method, returns the next
module name and object to be processed"""
if len(self.__iter_list) == 0:
self.__iter_list = None
raise StopIteration
else:
modname = self.__iter_list.pop()
return (modname, self.__modobjects[modname])
class RtEvalModules(object):
"""RtEvalModules should normally be inherrited by a more specific module class.
This class takes care of managing imported modules and have methods for starting
and stopping the workload these modules contains."""
def __init__(self, config, modules_root, logger):
"""Initialises the RtEvalModules() internal variables. The modules_root
argument should point at the root directory where the modules will be loaded from.
The logger argument should point to a Log() object which will be used for logging
and will also be given to the instantiated objects during module import."""
self._cfg = config
self._logger = logger
self.__modules = ModuleContainer(modules_root, logger)
self.__timestamps = {}
# Export some of the internal module container methods
# Primarily to have better control of the module containers
# iteration API
def _ImportModule(self, module):
"Imports a module exported by ModuleContainer::ExportModule()"
return self.__modules.ImportModule(module)
def _InstantiateModule(self, modname, modcfg, modroot = None):
"Imports a module and returns an instantiated object from the module"
return self.__modules.InstantiateModule(modname, modcfg, modroot)
def _RegisterModuleObject(self, modname, modobj):
"Registers an instantiated module object which RtEvalModules will control"
return self.__modules.RegisterModuleObject(modname, modobj)
def _LoadModule(self, modname, modroot=None):
"Loads and imports a module"
return self.__modules.LoadModule(modname, modroot)
def ModulesLoaded(self):
"Returns number of imported modules"
return self.__modules.ModulesLoaded()
def GetModulesList(self):
"Returns a list of module names"
return self.__modules.GetModulesList()
def SetupModuleOptions(self, parser):
"Sets up optparse based option groups for the loaded modules"
return self.__modules.SetupModuleOptions(parser, self._cfg)
def GetNamedModuleObject(self, modname):
"Returns a list of module names"
return self.__modules.GetNamedModuleObject(modname)
# End of exports
def Start(self):
"""Prepares all the imported modules workload to start, but they will not
start their workloads yet"""
if self.__modules.ModulesLoaded() == 0:
raise rtevalRuntimeError("No %s modules configured" % self._module_type)
self._logger.log(Log.INFO, "Preparing %s modules" % self._module_type)
for (modname, mod) in self.__modules:
mod.start()
if mod.WorkloadWillRun():
self._logger.log(Log.DEBUG, "\t - Started %s preparations" % modname)
self._logger.log(Log.DEBUG, "Waiting for all %s modules to get ready" % self._module_type)
busy = True
while busy:
busy = False
for (modname, mod) in self.__modules:
if not mod.isReady():
if not mod.hadRuntimeError():
busy = True
self._logger.log(Log.DEBUG, "Waiting for %s" % modname)
else:
raise RuntimeError("Runtime error starting the %s %s module" % (modname, self._module_type))
if busy:
time.sleep(1)
self._logger.log(Log.DEBUG, "All %s modules are ready" % self._module_type)
def hadError(self):
"Returns True if one or more modules had a RuntimeError"
return self.__runtimeError
def Unleash(self):
"""Unleashes all the loaded modules workloads"""
# turn loose the loads
nthreads = 0
self._logger.log(Log.INFO, "Sending start event to all %s modules" % self._module_type)
for (modname, mod) in self.__modules:
mod.setStart()
nthreads += 1
self.__timestamps['unleash'] = datetime.now()
return nthreads
def _isAlive(self):
"""Returns True if all modules are running"""
for (modname, mod) in self.__modules:
# We requiring all modules to run to pass
if not mod.WorkloadAlive():
return False
return True
def Stop(self):
"""Stops all the running workloads from in all the loaded modules"""
if self.ModulesLoaded() == 0:
raise RuntimeError("No %s modules configured" % self._module_type)
self._logger.log(Log.INFO, "Stopping %s modules" % self._module_type)
for (modname, mod) in self.__modules:
if not mod.WorkloadWillRun():
continue
mod.setStop()
try:
self._logger.log(Log.DEBUG, "\t - Stopping %s" % modname)
if mod.is_alive():
mod.join(2.0)
except RuntimeError, e:
self._logger.log(Log.ERR, "\t\tFailed stopping %s: %s" % (modname, str(e)))
self.__timestamps['stop'] = datetime.now()
def WaitForCompletion(self, wtime = None):
"""Waits for the running modules to complete their running"""
self._logger.log(Log.INFO, "Waiting for %s modules to complete" % self._module_type)
for (modname, mod) in self.__modules:
self._logger.log(Log.DEBUG, "\t - Waiting for %s" % modname)
mod.WaitForCompletion(wtime)
self._logger.log(Log.DEBUG, "All %s modules completed" % self._module_type)
def MakeReport(self):
"""Collects all the loaded modules reports in a single libxml2.xmlNode() object"""
rep_n = libxml2.newNode(self._report_tag)
for (modname, mod) in self.__modules:
self._logger.log(Log.DEBUG, "Getting report from %s" % modname)
modrep_n = mod.MakeReport()
if modrep_n is not None:
if self._module_type != 'load':
# Currently the <loads/> tag will not easily integrate
# timestamps. Not sure it makes sense to track this on
# load modules.
modrep_n.addChild(mod.GetTimestamps())
rep_n.addChild(modrep_n)
return rep_n