| # -*- coding: utf-8 -*- |
| # SPDX-License-Identifier: GPL-2.0-or-later |
| # |
| # Copyright 2009 - 2013 Clark Williams <williams@redhat.com> |
| # |
| |
| import sys |
| import re |
| import codecs |
| import libxml2 |
| import lxml.etree |
| |
| def convert_libxml2_to_lxml_doc(inxml): |
| "Converts a libxml2.xmlDoc into a lxml.etree document object" |
| |
| if not isinstance(inxml, libxml2.xmlDoc): |
| raise TypeError('Function requires an libxml2.xmlDoc as input') |
| |
| root = inxml.getRootElement() |
| ret = lxml.etree.XML(root.serialize('UTF-8')) |
| del root |
| return ret |
| |
| def convert_lxml_to_libxml2_nodes(inlxml): |
| "Converts a lxml.etree elements tree into a libxml2.xmlNode object" |
| |
| if not isinstance(inlxml, lxml.etree._Element) \ |
| and not isinstance(inlxml, lxml.etree._XSLTResultTree): |
| raise TypeError('Function requires an lxml.etree object as input') |
| |
| return libxml2.parseDoc(bytes.decode(lxml.etree.tostring(inlxml))).getRootElement() |
| |
| class XMLOut: |
| '''Class to create XML output''' |
| def __init__(self, roottag, version, attr=None, encoding='UTF-8'): |
| self.level = 0 |
| self.encoding = encoding |
| self.rootattr = attr |
| self.version = version |
| # 0 - no report created/loaded |
| # 1 - new report |
| # 2 - loaded report |
| # 3 - XML closed |
| self.status = 0 |
| self.tag_trans = self.__setup_tag_trans() |
| self.roottag = self.__fixtag(roottag) |
| self.xmldoc = None |
| |
| def __del__(self): |
| if self.level > 0: |
| raise RuntimeError(f"XMLOut: open blocks at __del__ (last opened '{self.currtag.name}')") |
| if self.xmldoc is not None: |
| self.xmldoc.freeDoc() |
| |
| def __setup_tag_trans(self): |
| t = str.maketrans('', '') |
| return t |
| # t = t.replace(' ', '_') |
| # t = t.replace('\t', '_') |
| # t = t.replace('(', '_') |
| # t = t.replace(')', '_') |
| # t = t.replace(':', '-') |
| |
| def __fixtag(self, tagname): |
| if not isinstance(tagname, str): |
| return str(tagname) |
| return tagname.translate(self.tag_trans) |
| |
| def __encode(self, value, tagmode=False): |
| if isinstance(value, str): |
| val = value |
| elif isinstance(value, str): |
| val = str(value) |
| else: |
| val = str(str(value)) |
| |
| if tagmode is True: |
| rx = re.compile(" ") |
| val = rx.sub("_", val) |
| |
| return val |
| |
| def __add_attributes(self, node, attr): |
| if attr is not None: |
| for k, v in list(attr.items()): |
| node.newProp(k, self.__encode(v)) |
| |
| def __parseToXML(self, node, data): |
| # All supported variable types needs to be set up |
| # here. TypeError exception will be raised on |
| # unknown types. |
| |
| t = type(data) |
| if t is str or t is str or t is int or t is float: |
| n = libxml2.newText(self.__encode(data)) |
| node.addChild(n) |
| elif t is bool: |
| v = data and "1" or "0" |
| n = libxml2.newText(self.__encode(v)) |
| node.addChild(n) |
| elif t is dict: |
| for (key, val) in list(data.items()): |
| node2 = libxml2.newNode(self.__encode(self.parsedata_prefix + key, True)) |
| self.__parseToXML(node2, val) |
| node.addChild(node2) |
| elif t is tuple: |
| for v in data: |
| if isinstance(v, dict): |
| self.__parseToXML(node, v) |
| else: |
| n = libxml2.newNode(self.tuple_tagname) |
| self.__parseToXML(n, v) |
| node.addChild(n) |
| else: |
| raise TypeError(f"unhandled type ({str(type(data))}) for value '{str(data)}'") |
| |
| def close(self): |
| if self.status == 0: |
| raise RuntimeError("XMLOut: No XML document is created nor loaded") |
| if self.status == 3: |
| raise RuntimeError("XMLOut: XML document already closed") |
| if self.level > 0: |
| raise RuntimeError(f"XMLOut: open blocks at close() (last opened '{self.currtag.name}')") |
| |
| if self.status == 1: # Only set the root node in the doc on created reports (NewReport called) |
| self.xmldoc.setRootElement(self.xmlroot) |
| self.status = 3 |
| |
| def NewReport(self): |
| if self.status != 0 and self.status != 3: |
| raise RuntimeError("XMLOut: Cannot start a new report without closing the currently opened one") |
| |
| if self.status == 3: |
| self.xmldoc.freeDoc() # Free the report from memory if we have one already |
| |
| self.xmldoc = libxml2.newDoc("1.0") |
| self.xmlroot = libxml2.newNode(self.roottag) |
| self.__add_attributes(self.xmlroot, {'version': self.version}) |
| self.__add_attributes(self.xmlroot, self.rootattr) |
| self.currtag = self.xmlroot |
| self.level = 0 |
| self.status = 1 |
| |
| def LoadReport(self, filename, validate_version=False): |
| if self.status == 3: |
| self.xmldoc.freeDoc() # Free the report from memory if we have one already |
| |
| self.xmldoc = libxml2.parseFile(filename) |
| if self.xmldoc.name != filename: |
| self.status = 3 |
| raise RuntimeError("XMLOut: Loading report failed") |
| |
| root = self.xmldoc.children |
| if root.name != self.roottag: |
| self.status = 3 |
| raise RuntimeError(f"XMLOut: Loaded report is not a valid {self.roottag} XML file") |
| |
| if validate_version is True: |
| ver = root.hasProp('version') |
| |
| if ver is None: |
| self.status = 3 |
| raise RuntimeError("XMLOut: Loaded report is missing version attribute in root node") |
| |
| if ver.getContent() != self.version: |
| self.status = 3 |
| raise RuntimeError(f"XMLOut: Loaded report is not of version {self.version}") |
| |
| self.status = 2 # Confirm that we have loaded a report from file |
| |
| def Write(self, filename, xslt=None): |
| if self.status != 2 and self.status != 3: |
| raise RuntimeError("XMLOut: XML document is not closed") |
| |
| if xslt is None: |
| # If no XSLT template is give, write raw XML |
| self.xmldoc.saveFormatFileEnc(filename, self.encoding, 1) |
| return |
| |
| # Load XSLT file and prepare the XSLT parser |
| xsltfile = open(xslt, 'r') |
| xsltdoc = lxml.etree.parse(xsltfile) |
| parser = lxml.etree.XSLT(xsltdoc) |
| xsltfile.close() |
| |
| # imitate libxml2's filename interpretation |
| if filename != "-": |
| dstfile = codecs.open(filename, "w", encoding=self.encoding) |
| else: |
| dstfile = sys.stdout |
| # |
| # Parse XML+XSLT and write the result to file |
| # |
| xmldoc = convert_libxml2_to_lxml_doc(self.xmldoc) |
| resdoc = parser(xmldoc) |
| |
| # Write the file with the requested output encoding |
| dstfile.write(bytes.decode(str(resdoc).encode(self.encoding))) |
| |
| if dstfile != sys.stdout: |
| dstfile.close() |
| |
| # Clean up |
| del resdoc |
| del xsltdoc |
| del parser |
| del xsltfile |
| del xmldoc |
| |
| def GetXMLdocument(self): |
| if self.status != 2 and self.status != 3: |
| raise RuntimeError("XMLOut: XML document is not closed") |
| return self.xmldoc |
| |
| def openblock(self, tagname, attributes=None): |
| if self.status != 1: |
| raise RuntimeError("XMLOut: openblock() cannot be called before NewReport() is called") |
| ntag = libxml2.newNode(self.__fixtag(tagname)) |
| self.__add_attributes(ntag, attributes) |
| self.currtag.addChild(ntag) |
| self.currtag = ntag |
| self.level += 1 |
| return ntag |
| |
| def closeblock(self): |
| if self.status != 1: |
| raise RuntimeError("XMLOut: closeblock() cannot be called before NewReport() is called") |
| if self.level == 0: |
| raise RuntimeError("XMLOut: no open tags to close") |
| self.currtag = self.currtag.get_parent() |
| self.level -= 1 |
| return self.currtag |
| |
| def taggedvalue(self, tag, value, attributes=None): |
| if self.status != 1: |
| raise RuntimeError("XMLOut: taggedvalue() cannot be called before NewReport() is called") |
| ntag = self.currtag.newTextChild(None, self.__fixtag(tag), self.__encode(value)) |
| self.__add_attributes(ntag, attributes) |
| return ntag |
| |
| def ParseData(self, tagname, data, attributes=None, tuple_tagname="tuples", prefix=""): |
| if self.status != 1: |
| raise RuntimeError("XMLOut: taggedvalue() cannot be called before NewReport() is called") |
| |
| self.tuple_tagname = self.__fixtag(tuple_tagname) |
| self.parsedata_prefix = prefix |
| |
| ntag = libxml2.newNode(self.__fixtag(tagname)) |
| self.__add_attributes(ntag, attributes) |
| self.__parseToXML(ntag, data) |
| self.currtag.addChild(ntag) |
| return ntag |
| |
| def AppendXMLnodes(self, nodes): |
| if not isinstance(nodes, libxml2.xmlNode): |
| raise ValueError("Input value is not a libxml2.xmlNode") |
| |
| return self.currtag.addChild(nodes) |
| |
| def unit_test(rootdir): |
| try: |
| x = XMLOut('rteval', 'UNIT-TEST', None, 'UTF-8') |
| x.NewReport() |
| x.openblock('run_info', {'days': 0, 'hours': 0, 'minutes': 32, 'seconds': 18}) |
| x.taggedvalue('time', '11:22:33') |
| x.taggedvalue('date', '2000-11-22') |
| x.closeblock() |
| x.openblock('uname') |
| x.taggedvalue('node', 'testing - \xe6\xf8') |
| x.taggedvalue('kernel', 'my_test_kernel', {'is_RT': 0}) |
| x.taggedvalue('arch', 'mips') |
| x.closeblock() |
| x.openblock('hardware') |
| x.taggedvalue('cpu_cores', 2) |
| x.taggedvalue('memory_size', 1024*1024*2) |
| x.closeblock() |
| x.openblock('loads', {'load_average': 3.29}) |
| x.taggedvalue('command_line', './load/loader --extreme --ultimate --threads 4096', |
| {'name': 'heavyloader'}) |
| x.taggedvalue('command_line', 'dd if=/dev/zero of=/dev/null', {'name': 'lightloader'}) |
| x.closeblock() |
| x.close() |
| print("------------- XML OUTPUT ----------------------------") |
| x.Write("-") |
| print("------------- XSLT PARSED OUTPUT --------------------") |
| x.Write("-", "rteval_text.xsl") |
| print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") |
| x.Write("/tmp/xmlout-test.xml") |
| del x |
| |
| print("------------- LOAD XML FROM FILE -----------------------------") |
| x = XMLOut('rteval', 'UNIT-TEST', None, 'UTF-8') |
| x.LoadReport("/tmp/xmlout-test.xml", True) |
| print("------------- LOADED XML DATA --------------------------------") |
| x.Write("-") |
| print("------------- XSLT PARSED OUTPUT FROM LOADED XML--------------") |
| x.Write("-", "rteval_text.xsl") |
| x.close() |
| |
| ## Test new data parser ... it eats most data types |
| print("------------- TESTING XMLOut::ParseData() --------------") |
| x.NewReport() |
| x.ParseData("ParseTest", "test string", {"type": "simple_string"}) |
| x.ParseData("ParseTest", 1234, {"type": "integer"}) |
| x.ParseData("ParseTest", 39.3904, {"type": "float"}) |
| x.ParseData("ParseTest", (11, 22, 33, 44, 55), {"type": "tuples"}) |
| x.ParseData("ParseTest", (99, 88, 77), {"type": "tuples", "comment": "Changed default tuple tag name"}, "int_values") |
| test = {"var1": "value 1", |
| "var2": {"varA1": 1, |
| "pi": 3.1415926, |
| "varA3": (1, |
| 2, |
| {"test1": "val1"}, |
| (4.1, 4.2, 4.3), |
| 5), |
| "varA4": {'another_level': True, |
| 'another_value': "blabla"} |
| }, |
| "utf8 data": 'æøå', |
| "løpe": True} |
| x.ParseData("ParseTest", test, {"type": "dict"}, prefix="test ") |
| x.close() |
| x.Write("-") |
| return 0 |
| except Exception as e: |
| print("** EXCEPTION %s", str(e)) |
| return 1 |
| |
| if __name__ == '__main__': |
| sys.exit(unit_test('..')) |