blob: b22e5833cbe7f6d2eccff9d65dcf50ffee84945e [file] [log] [blame]
import sys, threading, time, signal, libxml2
from optparse import OptionParser
from rteval_testserver import RTevald
from Logger import Logger
sys.path.insert(0,'..')
sys.path.insert(0,'../rteval')
sys.path.insert(0,'rteval')
import rtevalclient
class ServerThread(threading.Thread):
def __init__(self, port):
threading.Thread.__init__(self)
self.port = port
self.log = Logger('unit-test-server.log','rteval-xmlrpc-testsrv')
parser = OptionParser()
parser.add_option("-L", "--listen", action="store", dest="listen", default="127.0.0.1",
help="Which interface to listen to [default: %default]", metavar="IPADDR")
parser.add_option("-P", "--port", action="store", type="int", dest="port", default=self.port,
help="Which port to listen to [default: %default]", metavar="PORT")
(options, args) = parser.parse_args()
self.child = RTevald(options, self.log)
def run(self):
self.child.StartServer()
def stop(self):
self.child.StopServer()
def sigcatch(self, signum, frame):
print "Shutting down"
self.stop()
class ClientTest(object):
def __init__(self, port):
self.client = rtevalclient.rtevalclient("http://localhost:%s/rteval/API1/" % port)
def __prepare_data(self):
d = libxml2.newDoc("1.0")
n = libxml2.newNode('TestNode1')
d.setRootElement(n)
n2 = n.newTextChild(None, 'TestNode2','Just a little test')
n2.newProp('test','true')
for i in range(1,5):
n2 = n.newTextChild(None, 'TestNode3', 'Test line %i' %i)
self.testdoc = d
def RunTest(self):
try:
print "** Creating XML test document"
self.__prepare_data()
self.testdoc.saveFormatFileEnc('-','UTF-8', 1)
print "** Client test [1]: Hello(): %s" % str(self.client.Hello())
status = self.client.SendReport(self.testdoc)
print "** Client test [2]; SendReport(xmlDoc): %s" % str(status)
except Exception, e:
raise Exception("XML-RPC client test failed: %s" % str(e))
def unit_test(rootdir):
ret = 1
try:
# Prepare server and client objects
srvthread = ServerThread('65432')
clienttest = ClientTest('65432')
signal.signal(signal.SIGINT, srvthread.sigcatch)
# Start a local XML-RPC test server
srvthread.start()
print "** Waiting 2 seconds for server to settle"
time.sleep(2)
# Start the client test
print "** Starting client tests"
clienttest.RunTest()
ret = 0
except Exception, e:
print "** EXCEPTION: %s" % str(e)
finally:
# Stop the local XML-RPC test server
srvthread.stop()
return ret
if __name__ == "__main__":
sys.exit(unit_test('..'))