blob: 710d50a1fc0f9d811fdda7f93d8845e8ee0babc4 [file] [log] [blame]
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright 2009 - 2013 Clark Williams <williams@redhat.com>
#
import sys
import re
import codecs
import libxml2
import lxml.etree
def convert_libxml2_to_lxml_doc(inxml):
"Converts a libxml2.xmlDoc into a lxml.etree document object"
if not isinstance(inxml, libxml2.xmlDoc):
raise TypeError('Function requires an libxml2.xmlDoc as input')
root = inxml.getRootElement()
ret = lxml.etree.XML(root.serialize('UTF-8'))
del root
return ret
def convert_lxml_to_libxml2_nodes(inlxml):
"Converts a lxml.etree elements tree into a libxml2.xmlNode object"
if not isinstance(inlxml, lxml.etree._Element) \
and not isinstance(inlxml, lxml.etree._XSLTResultTree):
raise TypeError('Function requires an lxml.etree object as input')
return libxml2.parseDoc(bytes.decode(lxml.etree.tostring(inlxml))).getRootElement()
class XMLOut:
'''Class to create XML output'''
def __init__(self, roottag, version, attr=None, encoding='UTF-8'):
self.level = 0
self.encoding = encoding
self.rootattr = attr
self.version = version
# 0 - no report created/loaded
# 1 - new report
# 2 - loaded report
# 3 - XML closed
self.status = 0
self.tag_trans = self.__setup_tag_trans()
self.roottag = self.__fixtag(roottag)
self.xmldoc = None
def __del__(self):
if self.level > 0:
raise RuntimeError(f"XMLOut: open blocks at __del__ (last opened '{self.currtag.name}')")
if self.xmldoc is not None:
self.xmldoc.freeDoc()
def __setup_tag_trans(self):
t = str.maketrans('', '')
return t
# t = t.replace(' ', '_')
# t = t.replace('\t', '_')
# t = t.replace('(', '_')
# t = t.replace(')', '_')
# t = t.replace(':', '-')
def __fixtag(self, tagname):
if not isinstance(tagname, str):
return str(tagname)
return tagname.translate(self.tag_trans)
def __encode(self, value, tagmode=False):
if isinstance(value, str):
val = value
elif isinstance(value, str):
val = str(value)
else:
val = str(str(value))
if tagmode is True:
rx = re.compile(" ")
val = rx.sub("_", val)
return val
def __add_attributes(self, node, attr):
if attr is not None:
for k, v in list(attr.items()):
node.newProp(k, self.__encode(v))
def __parseToXML(self, node, data):
# All supported variable types needs to be set up
# here. TypeError exception will be raised on
# unknown types.
t = type(data)
if t is str or t is str or t is int or t is float:
n = libxml2.newText(self.__encode(data))
node.addChild(n)
elif t is bool:
v = data and "1" or "0"
n = libxml2.newText(self.__encode(v))
node.addChild(n)
elif t is dict:
for (key, val) in list(data.items()):
node2 = libxml2.newNode(self.__encode(self.parsedata_prefix + key, True))
self.__parseToXML(node2, val)
node.addChild(node2)
elif t is tuple:
for v in data:
if isinstance(v, dict):
self.__parseToXML(node, v)
else:
n = libxml2.newNode(self.tuple_tagname)
self.__parseToXML(n, v)
node.addChild(n)
else:
raise TypeError(f"unhandled type ({str(type(data))}) for value '{str(data)}'")
def close(self):
if self.status == 0:
raise RuntimeError("XMLOut: No XML document is created nor loaded")
if self.status == 3:
raise RuntimeError("XMLOut: XML document already closed")
if self.level > 0:
raise RuntimeError(f"XMLOut: open blocks at close() (last opened '{self.currtag.name}')")
if self.status == 1: # Only set the root node in the doc on created reports (NewReport called)
self.xmldoc.setRootElement(self.xmlroot)
self.status = 3
def NewReport(self):
if self.status != 0 and self.status != 3:
raise RuntimeError("XMLOut: Cannot start a new report without closing the currently opened one")
if self.status == 3:
self.xmldoc.freeDoc() # Free the report from memory if we have one already
self.xmldoc = libxml2.newDoc("1.0")
self.xmlroot = libxml2.newNode(self.roottag)
self.__add_attributes(self.xmlroot, {'version': self.version})
self.__add_attributes(self.xmlroot, self.rootattr)
self.currtag = self.xmlroot
self.level = 0
self.status = 1
def LoadReport(self, filename, validate_version=False):
if self.status == 3:
self.xmldoc.freeDoc() # Free the report from memory if we have one already
self.xmldoc = libxml2.parseFile(filename)
if self.xmldoc.name != filename:
self.status = 3
raise RuntimeError("XMLOut: Loading report failed")
root = self.xmldoc.children
if root.name != self.roottag:
self.status = 3
raise RuntimeError(f"XMLOut: Loaded report is not a valid {self.roottag} XML file")
if validate_version is True:
ver = root.hasProp('version')
if ver is None:
self.status = 3
raise RuntimeError("XMLOut: Loaded report is missing version attribute in root node")
if ver.getContent() != self.version:
self.status = 3
raise RuntimeError(f"XMLOut: Loaded report is not of version {self.version}")
self.status = 2 # Confirm that we have loaded a report from file
def Write(self, filename, xslt=None):
if self.status != 2 and self.status != 3:
raise RuntimeError("XMLOut: XML document is not closed")
if xslt is None:
# If no XSLT template is give, write raw XML
self.xmldoc.saveFormatFileEnc(filename, self.encoding, 1)
return
# Load XSLT file and prepare the XSLT parser
xsltfile = open(xslt, 'r')
xsltdoc = lxml.etree.parse(xsltfile)
parser = lxml.etree.XSLT(xsltdoc)
xsltfile.close()
# imitate libxml2's filename interpretation
if filename != "-":
dstfile = codecs.open(filename, "w", encoding=self.encoding)
else:
dstfile = sys.stdout
#
# Parse XML+XSLT and write the result to file
#
xmldoc = convert_libxml2_to_lxml_doc(self.xmldoc)
resdoc = parser(xmldoc)
# Write the file with the requested output encoding
dstfile.write(bytes.decode(str(resdoc).encode(self.encoding)))
if dstfile != sys.stdout:
dstfile.close()
# Clean up
del resdoc
del xsltdoc
del parser
del xsltfile
del xmldoc
def GetXMLdocument(self):
if self.status != 2 and self.status != 3:
raise RuntimeError("XMLOut: XML document is not closed")
return self.xmldoc
def openblock(self, tagname, attributes=None):
if self.status != 1:
raise RuntimeError("XMLOut: openblock() cannot be called before NewReport() is called")
ntag = libxml2.newNode(self.__fixtag(tagname))
self.__add_attributes(ntag, attributes)
self.currtag.addChild(ntag)
self.currtag = ntag
self.level += 1
return ntag
def closeblock(self):
if self.status != 1:
raise RuntimeError("XMLOut: closeblock() cannot be called before NewReport() is called")
if self.level == 0:
raise RuntimeError("XMLOut: no open tags to close")
self.currtag = self.currtag.get_parent()
self.level -= 1
return self.currtag
def taggedvalue(self, tag, value, attributes=None):
if self.status != 1:
raise RuntimeError("XMLOut: taggedvalue() cannot be called before NewReport() is called")
ntag = self.currtag.newTextChild(None, self.__fixtag(tag), self.__encode(value))
self.__add_attributes(ntag, attributes)
return ntag
def ParseData(self, tagname, data, attributes=None, tuple_tagname="tuples", prefix=""):
if self.status != 1:
raise RuntimeError("XMLOut: taggedvalue() cannot be called before NewReport() is called")
self.tuple_tagname = self.__fixtag(tuple_tagname)
self.parsedata_prefix = prefix
ntag = libxml2.newNode(self.__fixtag(tagname))
self.__add_attributes(ntag, attributes)
self.__parseToXML(ntag, data)
self.currtag.addChild(ntag)
return ntag
def AppendXMLnodes(self, nodes):
if not isinstance(nodes, libxml2.xmlNode):
raise ValueError("Input value is not a libxml2.xmlNode")
return self.currtag.addChild(nodes)
def unit_test(rootdir):
try:
x = XMLOut('rteval', 'UNIT-TEST', None, 'UTF-8')
x.NewReport()
x.openblock('run_info', {'days': 0, 'hours': 0, 'minutes': 32, 'seconds': 18})
x.taggedvalue('time', '11:22:33')
x.taggedvalue('date', '2000-11-22')
x.closeblock()
x.openblock('uname')
x.taggedvalue('node', 'testing - \xe6\xf8')
x.taggedvalue('kernel', 'my_test_kernel', {'is_RT': 0})
x.taggedvalue('arch', 'mips')
x.closeblock()
x.openblock('hardware')
x.taggedvalue('cpu_cores', 2)
x.taggedvalue('memory_size', 1024*1024*2)
x.closeblock()
x.openblock('loads', {'load_average': 3.29})
x.taggedvalue('command_line', './load/loader --extreme --ultimate --threads 4096',
{'name': 'heavyloader'})
x.taggedvalue('command_line', 'dd if=/dev/zero of=/dev/null', {'name': 'lightloader'})
x.closeblock()
x.close()
print("------------- XML OUTPUT ----------------------------")
x.Write("-")
print("------------- XSLT PARSED OUTPUT --------------------")
x.Write("-", "rteval_text.xsl")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
x.Write("/tmp/xmlout-test.xml")
del x
print("------------- LOAD XML FROM FILE -----------------------------")
x = XMLOut('rteval', 'UNIT-TEST', None, 'UTF-8')
x.LoadReport("/tmp/xmlout-test.xml", True)
print("------------- LOADED XML DATA --------------------------------")
x.Write("-")
print("------------- XSLT PARSED OUTPUT FROM LOADED XML--------------")
x.Write("-", "rteval_text.xsl")
x.close()
## Test new data parser ... it eats most data types
print("------------- TESTING XMLOut::ParseData() --------------")
x.NewReport()
x.ParseData("ParseTest", "test string", {"type": "simple_string"})
x.ParseData("ParseTest", 1234, {"type": "integer"})
x.ParseData("ParseTest", 39.3904, {"type": "float"})
x.ParseData("ParseTest", (11, 22, 33, 44, 55), {"type": "tuples"})
x.ParseData("ParseTest", (99, 88, 77), {"type": "tuples", "comment": "Changed default tuple tag name"}, "int_values")
test = {"var1": "value 1",
"var2": {"varA1": 1,
"pi": 3.1415926,
"varA3": (1,
2,
{"test1": "val1"},
(4.1, 4.2, 4.3),
5),
"varA4": {'another_level': True,
'another_value': "blabla"}
},
"utf8 data": 'æøå',
"løpe": True}
x.ParseData("ParseTest", test, {"type": "dict"}, prefix="test ")
x.close()
x.Write("-")
return 0
except Exception as e:
print("** EXCEPTION %s", str(e))
return 1
if __name__ == '__main__':
sys.exit(unit_test('..'))