blob: ddc29643e9cac859e837adbbf657bd2716403ffa [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2009 - 2013 Clark Williams <>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# For the avoidance of doubt the "preferred form" of this code is one which
# is in an open unpatent encumbered format. Where cryptographic key signing
# forms part of the process of creating an executable the information
# including keys needed to generate an equivalently functional executable
# are deemed to be part of the source code.
import os
import sys
import libxml2
import lxml.etree
import codecs
import re
from string import maketrans
def convert_libxml2_to_lxml_doc(inxml):
"Converts a libxml2.xmlDoc into a lxml.etree document object"
if not isinstance(inxml, libxml2.xmlDoc):
raise TypeError('Function requires an libxml2.xmlDoc as input')
root = inxml.getRootElement()
ret = lxml.etree.XML(root.serialize('UTF-8'))
del root
return ret
def convert_lxml_to_libxml2_nodes(inlxml):
"Converts a lxml.etree elements tree into a libxml2.xmlNode object"
if not isinstance(inlxml,lxml.etree._Element) and not isinstance(inlxml, lxml.etree._XSLTResultTree):
raise TypeError('Function requires an lxml.etree object as input')
return libxml2.parseDoc(lxml.etree.tostring(inlxml)).getRootElement()
class XMLOut(object):
'''Class to create XML output'''
def __init__(self, roottag, version, attr = None, encoding='UTF-8'):
self.level = 0
self.encoding = encoding
self.rootattr = attr
self.version = version
self.status = 0 # 0 - no report created/loaded, 1 - new report, 2 - loaded report, 3 - XML closed
self.tag_trans = self.__setup_tag_trans()
self.roottag = self.__fixtag(roottag)
self.xmldoc = None
def __del__(self):
if self.level > 0:
raise RuntimeError, "XMLOut: open blocks at __del__ (last opened '%s')" %
if self.xmldoc is not None:
def __setup_tag_trans(self):
t = maketrans('', '')
t = t.replace(' ', '_')
t = t.replace('\t', '_')
t = t.replace('(', '_')
t = t.replace(')', '_')
t = t.replace(':', '-')
return t
def __fixtag(self, tagname):
if not isinstance(tagname, str):
return str(tagname)
return tagname.translate(self.tag_trans)
def __encode(self, value, tagmode = False):
if type(value) is unicode:
val = value
elif type(value) is str:
val = unicode(value)
val = unicode(str(value))
if tagmode is True:
rx = re.compile(" ")
val = rx.sub("_", val)
# libxml2 uses UTF-8 internally and must have
# all input as UTF-8.
return val.encode('utf-8')
def __add_attributes(self, node, attr):
if attr is not None:
for k, v in attr.iteritems():
node.newProp(k, self.__encode(v))
def __parseToXML(self, node, data):
# All supported variable types needs to be set up
# here. TypeError exception will be raised on
# unknown types.
t = type(data)
if t is unicode or t is str or t is int or t is float:
n = libxml2.newText(self.__encode(data))
elif t is bool:
v = data and "1" or "0"
n = libxml2.newText(self.__encode(v))
elif t is dict:
for (key, val) in data.iteritems():
node2 = libxml2.newNode(self.__encode(self.parsedata_prefix + key, True))
self.__parseToXML(node2, val)
elif t is tuple:
for v in data:
if type(v) is dict:
self.__parseToXML(node, v)
n = libxml2.newNode(self.tuple_tagname)
self.__parseToXML(n, v)
raise TypeError, "unhandled type (%s) for value '%s'" % (type(data), unicode(data))
def close(self):
if self.status == 0:
raise RuntimeError, "XMLOut: No XML document is created nor loaded"
if self.status == 3:
raise RuntimeError, "XMLOut: XML document already closed"
if self.level > 0:
raise RuntimeError, "XMLOut: open blocks at close() (last opened '%s')" %
if self.status == 1: # Only set the root node in the doc on created reports (NewReport called)
self.status = 3
def NewReport(self):
if self.status != 0 and self.status != 3:
raise RuntimeError, "XMLOut: Cannot start a new report without closing the currently opened one"
if self.status == 3:
self.xmldoc.freeDoc() # Free the report from memory if we have one already
self.xmldoc = libxml2.newDoc("1.0")
self.xmlroot = libxml2.newNode(self.roottag)
self.__add_attributes(self.xmlroot, {'version': self.version})
self.__add_attributes(self.xmlroot, self.rootattr)
self.currtag = self.xmlroot
self.level = 0
self.status = 1
def LoadReport(self, filename, validate_version = False):
if self.status == 3:
self.xmldoc.freeDoc() # Free the report from memory if we have one already
self.xmldoc = libxml2.parseFile(filename)
if != filename:
self.status = 3
raise RuntimeError, "XMLOut: Loading report failed"
root = self.xmldoc.children
if != self.roottag:
self.status = 3
raise RuntimeError, "XMLOut: Loaded report is not a valid %s XML file" % self.roottag
if validate_version is True:
ver = root.hasProp('version')
if ver is None:
self.status = 3
raise RuntimeError, "XMLOut: Loaded report is missing version attribute in root node"
if ver.getContent() != self.version:
self.status = 3
raise RuntimeError, "XMLOut: Loaded report is not of version %s" % self.version
self.status = 2 # Confirm that we have loaded a report from file
def Write(self, filename, xslt = None):
if self.status != 2 and self.status != 3:
raise RuntimeError, "XMLOut: XML document is not closed"
if xslt == None:
# If no XSLT template is give, write raw XML
self.xmldoc.saveFormatFileEnc(filename, self.encoding, 1)
# Load XSLT file and prepare the XSLT parser
xsltfile = open(xslt, 'r')
xsltdoc = lxml.etree.parse(xsltfile)
parser = lxml.etree.XSLT(xsltdoc)
# imitate libxml2's filename interpretation
if filename != "-":
dstfile =, "w", encoding=self.encoding)
dstfile = sys.stdout
# Parse XML+XSLT and write the result to file
xmldoc = convert_libxml2_to_lxml_doc(self.xmldoc)
resdoc = parser(xmldoc)
# Write the file with the requested output encoding
if dstfile != sys.stdout:
# Clean up
del resdoc
del xsltdoc
del parser
del xsltfile
del xmldoc
def GetXMLdocument(self):
if self.status != 2 and self.status != 3:
raise RuntimeError, "XMLOut: XML document is not closed"
return self.xmldoc
def openblock(self, tagname, attributes=None):
if self.status != 1:
raise RuntimeError, "XMLOut: openblock() cannot be called before NewReport() is called"
ntag = libxml2.newNode(self.__fixtag(tagname));
self.__add_attributes(ntag, attributes)
self.currtag = ntag
self.level += 1
return ntag
def closeblock(self):
if self.status != 1:
raise RuntimeError, "XMLOut: closeblock() cannot be called before NewReport() is called"
if self.level == 0:
raise RuntimeError, "XMLOut: no open tags to close"
self.currtag = self.currtag.get_parent()
self.level -= 1
return self.currtag
def taggedvalue(self, tag, value, attributes=None):
if self.status != 1:
raise RuntimeError, "XMLOut: taggedvalue() cannot be called before NewReport() is called"
ntag = self.currtag.newTextChild(None, self.__fixtag(tag), self.__encode(value))
self.__add_attributes(ntag, attributes)
return ntag
def ParseData(self, tagname, data, attributes=None, tuple_tagname="tuples", prefix = ""):
if self.status != 1:
raise RuntimeError, "XMLOut: taggedvalue() cannot be called before NewReport() is called"
self.tuple_tagname = self.__fixtag(tuple_tagname)
self.parsedata_prefix = prefix
ntag = libxml2.newNode(self.__fixtag(tagname))
self.__add_attributes(ntag, attributes)
self.__parseToXML(ntag, data)
return ntag
def AppendXMLnodes(self, nodes):
if not isinstance(nodes, libxml2.xmlNode):
raise ValueError, "Input value is not a libxml2.xmlNode"
return self.currtag.addChild(nodes)
def unit_test(rootdir):
x = XMLOut('rteval', 'UNIT-TEST', None, 'UTF-8')
x.openblock('run_info', {'days': 0, 'hours': 0, 'minutes': 32, 'seconds': 18})
x.taggedvalue('time', '11:22:33')
x.taggedvalue('date', '2000-11-22')
x.taggedvalue('node', u'testing - \xe6\xf8')
x.taggedvalue('kernel', 'my_test_kernel', {'is_RT': 0})
x.taggedvalue('arch', 'mips')
x.taggedvalue('cpu_cores', 2)
x.taggedvalue('memory_size', 1024*1024*2)
x.openblock('loads', {'load_average': 3.29})
x.taggedvalue('command_line','./load/loader --extreme --ultimate --threads 4096',
{'name': 'heavyloader'})
x.taggedvalue('command_line','dd if=/dev/zero of=/dev/null', {'name': 'lightloader'})
print "------------- XML OUTPUT ----------------------------"
print "------------- XSLT PARSED OUTPUT --------------------"
x.Write("-", "rteval_text.xsl")
print "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
del x
print "------------- LOAD XML FROM FILE -----------------------------"
x = XMLOut('rteval','UNIT-TEST', None, 'UTF-8')
x.LoadReport("/tmp/xmlout-test.xml", True)
print "------------- LOADED XML DATA --------------------------------"
print "------------- XSLT PARSED OUTPUT FROM LOADED XML--------------"
x.Write("-", "rteval_text.xsl")
## Test new data parser ... it eats most data types
print "------------- TESTING XMLOut::ParseData() --------------"
x.ParseData("ParseTest", "test string", {"type": "simple_string"})
x.ParseData("ParseTest", 1234, {"type": "integer"})
x.ParseData("ParseTest", 39.3904, {"type": "float"})
x.ParseData("ParseTest", (11,22,33,44,55), {"type": "tuples"})
x.ParseData("ParseTest", (99,88,77), {"type": "tuples", "comment": "Changed default tuple tag name"},
test = {"var1": "value 1",
"var2": { "varA1": 1,
"pi": 3.1415926,
"varA3": (1,
{"test1": "val1"},
"varA4": {'another_level': True,
'another_value': "blabla"}
"utf8 data": u'æøå',
u"løpe": True}
x.ParseData("ParseTest", test, {"type": "dict"}, prefix="test ")
return 0
except Exception, e:
print "** EXCEPTION %s", str(e)
return 1
if __name__ == '__main__':