| #!/usr/bin/python3 |
| |
| import os |
| import shutil |
| import sys |
| import subprocess |
| import atexit |
| import time |
| import unittest |
| import importlib |
| from unittest.result import TestResult |
| import multiprocessing |
| import re |
| import traceback |
| |
| from configparser import ConfigParser |
| from prettytable import PrettyTable |
| from termcolor import colored |
| from glob import glob |
| from collections import namedtuple |
| import dbus.mainloop.glib |
| from gi.repository import GLib |
| |
| from runner import Runner |
| from utils import Process, Namespace, BarChart |
| |
| config = None |
| intf_id = 0 |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| def dbg(*s, **kwargs): |
| ''' |
| Allows prints if stdout has been re-directed |
| ''' |
| print(*s, **kwargs, file=sys.__stdout__) |
| |
| def write_results(file, results): |
| with open(file, 'w') as f: |
| for test, results in results.items(): |
| if results.failures != 0 or results.errors != 0: |
| f.write('%s:FAIL\n' % test) |
| else: |
| f.write('%s:PASS\n' % test) |
| |
| def exit_vm(): |
| if config: |
| for p in Process.get_all(): |
| print("Process %s still running!" % p.args[0]) |
| p.kill() |
| |
| if config.ctx and config.ctx.results: |
| success = print_results(config.ctx.results, int(config.ctx.args.timeout)) |
| |
| if config.ctx.args.result: |
| write_results(config.ctx.args.result, config.ctx.results) |
| |
| os.sync() |
| |
| runner.stop() |
| |
| class Interface: |
| def __init__(self, name, config, ns): |
| self.name = name |
| self.ctrl_interface = '/var/run/hostapd/' + name |
| self.config = config |
| self.ns = ns |
| |
| def __del__(self): |
| Process(['iw', 'dev', self.name, 'del'], namespace=self.ns.name).wait() |
| |
| def set_interface_state(self, state): |
| Process(['ip', 'link', 'set', self.name, state], namespace=self.ns.name).wait() |
| |
| class Radio: |
| def __init__(self, name, default_ns, used_by='iwd'): |
| self.name = name |
| # hostapd will reset this if this radio is used by it |
| self.use = used_by |
| self.interface = None |
| self.ns = default_ns |
| |
| def __del__(self): |
| print("Removing radio %s" % self.name) |
| self.interface = None |
| |
| def set_namespace(self, ns): |
| self.ns = ns |
| Process(['iw', 'phy', self.name, 'set', 'netns', 'name', ns.name]).wait() |
| |
| def create_interface(self, config, use): |
| global intf_id |
| |
| ifname = 'wln%s' % intf_id |
| |
| intf_id += 1 |
| |
| self.interface = Interface(ifname, config, self.ns) |
| self.use = use |
| |
| Process(['iw', 'phy', self.name, 'interface', 'add', ifname, |
| 'type', 'managed'], namespace=self.ns.name).wait() |
| |
| return self.interface |
| |
| def __str__(self): |
| ret = self.name + ':\n' |
| ret += '\tUsed By: %s' % self.use |
| if self.interface: |
| ret += ' (%s)' % self.interface.name |
| if self.ns is not None: |
| ret += ' (ns=%s)' % self.ns.name |
| |
| ret += '\n' |
| |
| return ret |
| |
| class VirtualRadio(Radio): |
| ''' |
| A subclass of 'Radio' specific to mac80211_hwsim radios. |
| |
| TODO: Using D-Bus to create and destroy radios is more desirable |
| than the command line. |
| ''' |
| |
| def __init__(self, name, default_ns, cfg=None): |
| global config |
| |
| self.disable_cipher = None |
| self.disable_iftype = None |
| |
| self.hwsim = config.hwsim.Hwsim() |
| |
| used_by = 'iwd' |
| |
| if cfg: |
| self.disable_iftype = cfg.get('iftype_disable', None) |
| self.disable_cipher = cfg.get('cipher_disable', None) |
| used_by = cfg.get('reserve', 'iwd') |
| |
| self._radio = self.hwsim.radios.create(name, p2p_device=True, |
| iftype_disable=self.disable_iftype, |
| cipher_disable=self.disable_cipher) |
| |
| super().__init__(self._radio.name, default_ns, used_by) |
| |
| def __del__(self): |
| super().__del__() |
| |
| # If the radio was moved into a namespace this will fail |
| try: |
| self._radio.remove() |
| except: |
| pass |
| |
| self._radio = None |
| |
| def __str__(self): |
| ret = super().__str__() |
| |
| if self.disable_iftype: |
| ret += '\tDisabled interface types: %s\n' % self.disable_iftype |
| |
| if self.disable_cipher: |
| ret += '\tDisabled ciphers: %s\n' % self.disable_cipher |
| |
| ret += '\tPath: %s' % self._radio.path |
| |
| ret += '\n' |
| |
| return ret |
| |
| class HostapdInstance: |
| ''' |
| A single instance of hostapd. In reality all hostapd instances |
| are started as a single process. This class just makes things |
| convenient for communicating with one of the hostapd APs. |
| ''' |
| def __init__(self, config, radio): |
| self.radio = radio |
| self.config = config |
| self.cli = None |
| |
| self.intf = radio.create_interface(self.config, 'hostapd') |
| self.intf.set_interface_state('up') |
| |
| def __del__(self): |
| print("Removing HostapdInstance %s" % self.config) |
| self.intf.set_interface_state('down') |
| self.radio = None |
| self.intf = None |
| |
| def __str__(self): |
| ret = 'Hostapd (%s)\n' % self.intf.name |
| ret += '\tConfig: %s\n' % self.config |
| |
| return ret |
| |
| class Hostapd: |
| ''' |
| A set of running hostapd instances. This is really just a single |
| process since hostapd can be started with multiple config files. |
| ''' |
| def __init__(self, ns, radios, configs, radius): |
| if len(configs) != len(radios): |
| raise Exception("Config (%d) and radio (%d) list length not equal" % \ |
| (len(configs), len(radios))) |
| |
| print("Initializing hostapd instances") |
| |
| Process(['ip', 'link', 'set', 'eth0', 'up']).wait() |
| Process(['ip', 'link', 'set', 'eth1', 'up']).wait() |
| |
| self.ns = ns |
| self.global_ctrl_iface = '/var/run/hostapd/ctrl' + (str(ns.name) if ns.name else 'main') |
| self.instances = [HostapdInstance(c, r) for c, r in zip(configs, radios)] |
| |
| ifaces = [rad.interface.name for rad in radios] |
| ifaces = ','.join(ifaces) |
| |
| args = ['hostapd', '-g', self.global_ctrl_iface] |
| |
| if ifaces: |
| args.extend(['-i', ifaces]) |
| |
| # |
| # Config files should already be present in /tmp. This appends |
| # ctrl_interface and does any variable replacement. Currently |
| # this is just any $ifaceN occurrences. |
| # |
| for c in configs: |
| full_path = '/tmp/%s' % c |
| args.append(full_path) |
| |
| self._rewrite_config(full_path) |
| |
| if radius: |
| args.append(radius) |
| |
| if Process.is_verbose('hostapd'): |
| args.append('-d') |
| |
| self.process = Process(args, namespace=ns.name) |
| |
| self.process.wait_for_socket(self.global_ctrl_iface, 30) |
| |
| for hapd in self.instances: |
| self.process.wait_for_socket(hapd.intf.ctrl_interface, 30) |
| |
| def attach_cli(self): |
| global config |
| |
| for hapd in self.instances: |
| hapd.cli = config.hostapd.HostapdCLI(config=hapd.config) |
| |
| def _rewrite_config(self, config): |
| ''' |
| Replaces any $ifaceN values with the correct interface |
| names as well as appends the ctrl_interface path to |
| the config file. |
| ''' |
| with open(config, 'r+') as f: |
| data = f.read() |
| to_replace = [] |
| for match in re.finditer(r'\$iface[0-9]+', data): |
| tag = data[match.start():match.end()] |
| idx = tag.split('iface')[1] |
| |
| to_replace.append((tag, self.instances[int(idx)].intf.name)) |
| |
| for r in to_replace: |
| data = data.replace(r[0], r[1], 1) |
| |
| data += '\nctrl_interface=/var/run/hostapd\n' |
| |
| f.write(data) |
| |
| def __getitem__(self, config): |
| if not config: |
| return self.instances[0] |
| |
| for hapd in self.instances: |
| if hapd.config == config: |
| return hapd |
| |
| return None |
| |
| def __del__(self): |
| print("Removing Hostapd") |
| try: |
| os.remove(self.global_ctrl_iface) |
| except: |
| print("Failed to remove %s" % self.global_ctrl_iface) |
| |
| for hapd in self.instances: |
| GLib.source_remove(hapd.cli.io_watch) |
| |
| # Hostapd may have already been stopped |
| if self.process: |
| self.process.kill() |
| |
| # Hostapd creates simdb sockets for EAP-SIM/AKA tests but does not |
| # clean them up. |
| for f in glob("/tmp/eap_sim_db*"): |
| os.remove(f) |
| |
| class TestContext(Namespace): |
| ''' |
| Contains all information for a given set of tests being run |
| such as processes, radios, interfaces and test results. |
| ''' |
| def __init__(self, args): |
| self.name = None |
| self.args = args |
| self.hw_config = None |
| self.hostapd = None |
| self.wpas_interfaces = None |
| self.radios = [] |
| self.results = {} |
| self.namespaces = [] |
| self._last_mem_available = 0 |
| self._mem_chart = BarChart() |
| |
| def start_dbus_monitor(self): |
| if not Process.is_verbose('dbus-monitor'): |
| return |
| |
| self.start_process(['dbus-monitor', '--address', self.dbus_address]) |
| |
| def start_haveged(self): |
| self.start_process(['haveged', '-F']) |
| |
| def create_radios(self): |
| setup = self.hw_config['SETUP'] |
| nradios = int(setup['num_radios']) |
| args = ['hwsim'] |
| |
| if self.hw_config['SETUP'].get('hwsim_medium', 'no') in ['no', '0', 'false']: |
| # register hwsim as medium |
| args.extend(['--no-register']) |
| |
| proc = self.start_process(args) |
| proc.wait_for_service(self, 'net.connman.hwsim', 20) |
| |
| for i in range(nradios): |
| name = 'rad%u' % i |
| |
| # Get any [radX] sections. These are for configuring |
| # any special radios. This no longer requires a |
| # radio_conf list, we just assume radios start rad0 |
| # and increment. |
| rad_config = None |
| if self.hw_config.has_section(name): |
| rad_config = self.hw_config[name] |
| |
| self.radios.append(VirtualRadio(name, self, rad_config)) |
| |
| def discover_radios(self): |
| import pyroute2 |
| |
| phys = [] |
| |
| try: |
| iw = pyroute2.iwutil.IW() |
| except: |
| iw = pyroute2.IW() |
| |
| attrs = [phy['attrs'] for phy in iw.list_wiphy()] |
| |
| for attr in attrs: |
| for key, value in attr: |
| if key == 'NL80211_ATTR_WIPHY_NAME': |
| if value not in phys: |
| phys.append(value) |
| break |
| |
| print('Discovered radios: %s' % str(phys)) |
| self.radios = [Radio(name, self) for name in phys] |
| |
| def start_radios(self): |
| reg_domain = self.hw_config['SETUP'].get('reg_domain', None) |
| if reg_domain: |
| Process(['iw', 'reg', 'set', reg_domain]).wait() |
| |
| if self.args.hw: |
| self.discover_radios() |
| else: |
| self.create_radios() |
| |
| def start_hostapd(self): |
| if not 'HOSTAPD' in self.hw_config: |
| return |
| |
| settings = self.hw_config['HOSTAPD'] |
| |
| if self.args.hw: |
| # Just grab the first N radios. It gets rather |
| # complicated trying to map radX radios specified in |
| # hw.conf so any passed through physical adapters are |
| # just given to hostapd/IWD as they appear during |
| # discovery. |
| # |
| # TODO: It may be desirable to map PCI/USB adapters to |
| # specific radX radios specified in the config but |
| # there are really 2 separate use cases here. |
| # 1. You want to test a *specific* radio with IWD |
| # or hostapd. For this you would want radX |
| # to map to a specific radio |
| # 2. You have many adapters in use to run multiple |
| # tests. In this case you would not care what |
| # was using each radio, just that there was |
| # enough to run all tests. |
| hapd_configs = [conf for rad, conf in settings.items() if rad != 'radius_server'] |
| hapd_processes = [(self, self.radios[:len(hapd_configs)], hapd_configs)] |
| else: |
| hapd_processes = [] |
| for ns in [self] + self.namespaces: |
| ns_radios = [rad for rad in ns.radios if rad.name in settings] |
| if len(ns_radios): |
| ns_configs = [settings[rad.name] for rad in ns_radios] |
| hapd_processes.append((ns, ns_radios, ns_configs)) |
| if not hapd_processes: |
| hapd_processes.append((self, [], [])) |
| |
| radius_config = settings.get('radius_server', None) |
| |
| self.hostapd = [Hostapd(ns, radios, configs, radius_config) |
| for ns, radios, configs in hapd_processes] |
| |
| for hapd in self.hostapd: |
| hapd.attach_cli() |
| |
| def get_frequencies(self): |
| frequencies = [] |
| |
| for hapd in self.hostapd: |
| frequencies += [instance.cli.frequency for instance in hapd.instances] |
| |
| return frequencies |
| |
| def get_hapd_instance(self, config=None): |
| instances = [i for hapd in self.hostapd for i in hapd.instances] |
| |
| if config is None: |
| return instances[0] |
| |
| for hapd in instances: |
| if hapd.config == config: |
| return hapd |
| |
| def start_wpas_interfaces(self): |
| if 'WPA_SUPPLICANT' not in self.hw_config: |
| return |
| |
| if not shutil.which('wpa_supplicant'): |
| print('wpa_supplicant not found, dependent tests will be skipped') |
| return |
| |
| settings = self.hw_config['WPA_SUPPLICANT'] |
| |
| if self.args.hw: |
| nradios = len(settings.items()) |
| |
| wpas_radios = self.radios[:nradios] |
| self.wpas_interfaces = [] |
| |
| # |
| # Physical radios most likely will use a different name |
| # than 'rad#' but the config file is referenced by these |
| # 'rad#' names. Iterate through both the settings and |
| # physical radios to create interfaces associated with |
| # each config file. |
| # |
| for vrad, hwrad in zip(settings.items(), wpas_radios): |
| self.wpas_interfaces.append(hwrad.create_interface(vrad[1], 'wpas')) |
| |
| else: |
| wpas_radios = [rad for rad in self.radios if rad.name in settings] |
| self.wpas_interfaces = [rad.create_interface(settings[rad.name], 'wpas') \ |
| for rad in wpas_radios] |
| |
| def start_ofono(self): |
| sim_keys = self.hw_config['SETUP'].get('sim_keys', None) |
| if not sim_keys: |
| print("Ofono not required") |
| return |
| elif sim_keys != 'ofono': |
| os.environ['IWD_SIM_KEYS'] = sim_keys |
| return |
| |
| if not shutil.which('ofonod') or not shutil.which('phonesim'): |
| print("Ofono or Phonesim not found, skipping test") |
| return |
| |
| os.environ['OFONO_PHONESIM_CONFIG'] = '/tmp/phonesim.conf' |
| |
| phonesim_args = ['phonesim', '-p', '12345', '/usr/share/phonesim/default.xml'] |
| |
| self.start_process(phonesim_args) |
| |
| # |
| # TODO: |
| # Is there something to wait for? Without this phonesim rejects |
| # connections on all but the fist test. |
| # |
| time.sleep(3) |
| |
| ofono_args = ['ofonod', '-n', '--plugin=atmodem,phonesim'] |
| if Process.is_verbose('ofonod'): |
| ofono_args.append('-d') |
| |
| self.start_process(ofono_args) |
| |
| print("Ofono started") |
| |
| def create_namespaces(self): |
| if not self.hw_config.has_section('NameSpaces'): |
| return |
| |
| for key, value in self.hw_config.items('NameSpaces'): |
| radio_names = value.split(',') |
| # Gather up radio objects for this namespace |
| radios = [rad for rad in self.radios if rad.name in radio_names] |
| |
| # Remove radios from 'root' namespace |
| self.radios = list(set(self.radios) - set(radios)) |
| |
| self.namespaces.append(Namespace(self.args, key, radios)) |
| |
| def get_namespace(self, ns): |
| for n in self.namespaces: |
| if n.name == ns: |
| return n |
| |
| return None |
| |
| def stop_test_processes(self): |
| for n in self.namespaces: |
| n.reset() |
| |
| self.namespaces = [] |
| self.hostapd = None |
| self.wpas_interfaces = None |
| |
| self.reset() |
| |
| def meminfo_to_dict(self): |
| def removesuffix(string, suffix): |
| if string.endswith(suffix): |
| return string[:-len(suffix)] |
| return string |
| |
| ret = {} |
| |
| with open('/proc/meminfo', 'r') as f: |
| data = f.read().strip().split('\n') |
| |
| for l in data: |
| entry = l.split(':') |
| ret[entry[0]] = int(removesuffix(entry[1], 'kB')) |
| |
| return ret |
| |
| def __str__(self): |
| ret = 'Arguments:\n' |
| for arg in vars(self.args): |
| ret += '\t --%s %s\n' % (arg, str(getattr(self.args, arg))) |
| |
| if self.hostapd: |
| for hapd in self.hostapd: |
| ret += 'Hostapd (ns=%s):\n' % (hapd.ns.name,) |
| for h in hapd.instances: |
| ret += '\t%s\n' % (str(h),) |
| else: |
| ret += 'Hostapd:\n' |
| ret += '\tNo Hostapd instances\n' |
| |
| info = self.meminfo_to_dict() |
| self._mem_chart.add_value(info['MemAvailable']) |
| |
| ret += 'Available Memory: %u kB\n' % info['MemAvailable'] |
| ret += 'Last Test Delta: %+d kB\n' % (info['MemAvailable'] - self._last_mem_available) |
| ret += 'Per-test Usage:\n' |
| ret += str(self._mem_chart) |
| |
| self._last_mem_available = info['MemAvailable'] |
| |
| ret += super().__str__() |
| |
| for n in self.namespaces: |
| ret += n.__str__() |
| |
| return ret |
| |
| def build_unit_list(args): |
| ''' |
| Build list of unit tests based on passed arguments. This first |
| checks for literal names provided in the arguments, then if |
| no matches were found, checks for a glob match. |
| ''' |
| tests = [] |
| test_root = args.testhome + '/unit' |
| |
| for unit in args.unit_tests.split(','): |
| path = '%s/%s' % (test_root, unit) |
| if os.access(unit, os.X_OK): |
| tests.append(unit) |
| elif os.access(path, os.X_OK): |
| tests.append(path) |
| else: |
| # Full list or glob, first build up valid list of tests |
| matches = glob(path) |
| if matches == []: |
| raise Exception("Could not find test %s" % unit) |
| |
| matches = [exe for exe in matches if os.access(exe, os.X_OK)] |
| |
| tests.extend(matches) |
| |
| return sorted(tests) |
| |
| def build_test_list(args): |
| ''' |
| Build list of auto test directories based on passed arguments. |
| First check for absolute paths, then look in <iwd>/autotests, |
| then glob match. |
| ''' |
| tests = [] |
| test_root = args.testhome + '/autotests' |
| |
| # Run all tests |
| if not args.autotests: |
| # Get list of all autotests (committed in git) |
| Process(['git', 'config', '--system', '--add', 'safe.directory', |
| os.path.normpath(args.testhome)]).wait() |
| tests = os.popen('git -C %s ls-files autotests/ | cut -f2 -d"/" \ |
| | grep "^test" | uniq' % args.testhome).read() \ |
| .strip().split('\n') |
| tests = [test_root + '/' + t for t in tests] |
| else: |
| print("Generating partial test list") |
| |
| full_list = sorted(os.listdir(test_root)) |
| |
| for t in args.autotests.split(','): |
| path = '%s/%s' % (test_root, t) |
| if t.endswith('+'): |
| t = t.split('+')[0] |
| i = full_list.index(t) |
| |
| tests = [test_root + '/' + x for x in full_list[i:] \ |
| if x.startswith('test')] |
| elif os.path.exists(t): |
| if t not in tests: |
| tests.append(t) |
| elif os.path.exists(path): |
| if path not in tests: |
| tests.append(path) |
| else: |
| matches = glob(path) |
| if matches == []: |
| raise Exception("Could not find test %s" % t) |
| |
| tests.extend(list(set(matches) - set(tests))) |
| |
| return sorted(tests) |
| |
| SimpleResult = namedtuple('SimpleResult', 'run failures errors skipped time') |
| |
| def start_test(ctx, subtests, rqueue): |
| ''' |
| Run an individual test. 'subtests' are parsed prior to calling |
| but these effectively make up a single test. 'rqueue' is the |
| results queue which is required since this is using |
| multiprocessing. |
| ''' |
| run = 0 |
| errors = 0 |
| failures = 0 |
| skipped = 0 |
| |
| start = time.time() |
| # |
| # Iterate through each individual python test. |
| # |
| for s in subtests: |
| loader = unittest.TestLoader() |
| try: |
| module = importlib.import_module(os.path.splitext(s)[0]) |
| except OSError as e: |
| dbg(subprocess.check_output("cat /proc/buddyinfo", shell=True).decode('utf-8')) |
| dbg(subprocess.check_output("dmesg | tail -80", shell=True).decode('utf-8')) |
| print(ctx) |
| raise e |
| |
| subtest = loader.loadTestsFromModule(module) |
| |
| # The test suite is being (ab)used to get a bit more granularity |
| # with individual tests. The 'normal' way to use unittest is to |
| # just create a test suite and run them. The problem here is that |
| # test results are queued and printed at the very end so its |
| # difficult to know *where* a test failed (python gives a stack |
| # trace but printing the exception/failure immediately shows |
| # where in the debug logs something failed). More so if there are |
| # several test functions inside a single python file they run |
| # as a single test and it is difficult (again) to know where |
| # something failed. |
| |
| # Iterating through each python test file |
| for test in subtest: |
| limit_funcs = [] |
| |
| if ctx.args.sub_tests: |
| for i in ctx.args.sub_tests: |
| if len(i.split('.')) == 2: |
| limit_funcs.append(i.split('.')[1]) |
| |
| # Iterating through individual test functions inside a |
| # Test() class. Due to the nature of unittest we have |
| # to jump through some hoops to set up the test class |
| # only once by turning the enumeration into a list, then |
| # enumerating (again) to keep track of the index (just |
| # enumerating the test class doesn't allow len() because |
| # it is not a list). |
| tlist = list(enumerate(test)) |
| for index, t in enumerate(tlist): |
| # enumerate is returning a tuple, index 1 is our |
| # actual object. |
| t = t[1] |
| |
| func, file = str(t).split(' ') |
| # |
| # TODO: There may be a better way of doing this |
| # but stringifying the test class gives us a string: |
| # <function> (<file>.<class>) |
| # |
| file = file.strip('()').split('.')[0] + '.py' |
| |
| # Create an empty result here in case the test fails |
| result = TestResult() |
| |
| try: |
| skip = len(limit_funcs) > 0 and func not in limit_funcs |
| |
| # Set up class only on first test |
| if index == 0: |
| if not skip: |
| dbg("%s\n\t%s RUNNING" % (file, str(func)), end='') |
| t.setUpClass() |
| else: |
| if not skip: |
| dbg("\t%s RUNNING" % str(func), end='') |
| |
| sys.__stdout__.flush() |
| |
| name = os.path.basename(os.getcwd()) |
| |
| Process.write_separators(name, "\n====== %s:%s:%s ======\n\n" % |
| (name, file, func)) |
| |
| if not skip: |
| # Run test (setUp/tearDown run automatically) |
| result = t() |
| |
| # Tear down class only on last test |
| if index == len(tlist) - 1: |
| t.tearDownClass() |
| |
| if skip: |
| continue |
| except unittest.SkipTest as e: |
| result.skipped.append(t) |
| except Exception as e: |
| dbg('\n%s threw an uncaught exception:' % func) |
| traceback.print_exc(file=sys.__stdout__) |
| |
| run += result.testsRun |
| errors += len(result.errors) |
| failures += len(result.failures) |
| skipped += len(result.skipped) |
| |
| if len(result.skipped) > 0: |
| dbg(colored(" SKIPPED", "cyan")) |
| elif run == 0 or len(result.errors) > 0 or len(result.failures) > 0: |
| dbg(colored(" FAILED", "red")) |
| for e in result.errors: |
| dbg(e[1]) |
| for f in result.failures: |
| dbg(f[1]) |
| else: |
| dbg(colored(" PASSED", "green")) |
| |
| # Prevents future test modules with the same name (e.g. |
| # connection_test.py) from being loaded from the cache |
| sys.modules.pop(module.__name__) |
| |
| # |
| # The multiprocessing queue is picky with what objects it will serialize |
| # and send between processes. Because of this we put the important bits |
| # of the result into our own 'SimpleResult' tuple. |
| # |
| sresult = SimpleResult(run=run, failures=failures, errors=errors, |
| skipped=skipped, time=time.time() - start) |
| rqueue.put(sresult) |
| |
| # This may not be required since we are manually popping sys.modules |
| importlib.invalidate_caches() |
| |
| def pre_test(ctx, test, copied): |
| ''' |
| Copy test files, start processes, and any other pre test work. |
| ''' |
| os.chdir(test) |
| |
| dbg("\nStarting %s" % colored(os.path.basename(test), "white", attrs=['bold'])) |
| if not os.path.exists(test + '/hw.conf'): |
| raise Exception("No hw.conf found for %s" % test) |
| |
| ctx.hw_config = ConfigParser() |
| ctx.hw_config.read(test + '/hw.conf') |
| # |
| # We have two types of test files: tests and everything else. Rather |
| # than require each test to specify the files needing to be copied to |
| # /tmp (previously 'tmpfs_extra_stuff'), we just copy everything which |
| # isn't a test. There is really no reason not to do this as any file |
| # present in a test directory should be needed by the test. |
| # |
| # All files |
| files = os.listdir(test) |
| # Tests (starts or ends with 'test') |
| subtests = [f for f in files if f.startswith('test') or \ |
| os.path.splitext(f)[0].endswith('test')] |
| # Everything else (except .py files) |
| to_copy = [f for f in list(set(files) - set(subtests)) if not f.endswith('.py') \ |
| and f != '__pycache__'] |
| for f in to_copy: |
| if os.path.isdir(f): |
| shutil.copytree(f, '/tmp/' + f) |
| else: |
| shutil.copy(f, '/tmp') |
| copied.append(f) |
| |
| # Prune down any subtests if needed |
| if ctx.args.sub_tests: |
| ctx.args.sub_tests = ctx.args.sub_tests.split(',') |
| |
| to_run = [x.split('.')[0] for x in ctx.args.sub_tests] |
| pruned = [] |
| |
| for s in subtests: |
| no_ext = s |
| # Handle <file>.<test function> format |
| if '.' in s: |
| no_ext = s.split('.')[0] |
| |
| if no_ext in to_run: |
| pruned.append(no_ext + '.py') |
| |
| subtests = pruned |
| |
| if ctx.args.log: |
| ctx.start_process(['iwmon', '--nowiphy']) |
| elif ctx.args.monitor: |
| ctx.start_process(['iwmon'], outfile=ctx.args.monitor) |
| |
| ctx.start_dbus() |
| ctx.start_haveged() |
| ctx.start_dbus_monitor() |
| ctx.start_radios() |
| ctx.create_namespaces() |
| ctx.start_hostapd() |
| ctx.start_wpas_interfaces() |
| ctx.start_ofono() |
| |
| if ctx.hw_config.getboolean('SETUP', 'start_iwd', fallback=True): |
| ctx.start_iwd() |
| |
| print(ctx) |
| |
| sys.path.insert(1, test) |
| |
| return sorted(subtests) |
| |
| def post_test(ctx, to_copy): |
| ''' |
| Remove copied files, and stop test processes. |
| ''' |
| try: |
| for f in to_copy: |
| if os.path.isdir('/tmp/' + f): |
| shutil.rmtree('/tmp/' + f) |
| elif os.path.exists('/tmp/' + f): |
| os.remove('/tmp/' + f) |
| |
| except Exception as e: |
| print("Exception thrown in post_test") |
| finally: |
| ctx.stop_test_processes() |
| |
| if ctx.args.valgrind: |
| for f in os.listdir('/tmp'): |
| if not f.startswith("valgrind.log."): |
| continue |
| |
| with open('/tmp/' + f, 'r') as v: |
| result = v.read() |
| |
| # Don't print out the result if there were no issues |
| if "0 errors from 0 contexts" not in result: |
| dbg(result) |
| dbg("\n") |
| |
| os.remove('/tmp/' + f) |
| |
| # Special case for when logging is enabled |
| if os.path.isfile('/tmp/iwd-tls-debug-server-cert.pem'): |
| os.remove('/tmp/iwd-tls-debug-server-cert.pem') |
| |
| allowed = ['phonesim.conf', 'certs', 'secrets', 'iwd'] |
| for f in [f for f in os.listdir('/tmp') if f not in allowed]: |
| dbg("File %s was not cleaned up!" % f) |
| try: |
| os.remove('/tmp/' + f) |
| except: |
| pass |
| |
| def print_results(results, max_timeout): |
| table = PrettyTable(['Test', colored('Passed', 'green'), colored('Failed', 'red'), \ |
| colored('Skipped', 'cyan'), colored('Time', 'yellow')]) |
| |
| total_pass = 0 |
| total_fail = 0 |
| total_skip = 0 |
| total_time = 0 |
| |
| for test, result in results.items(): |
| |
| if result.time == max_timeout: |
| failed = "Timed out" |
| passed = "Timed out" |
| elif result.time == 0: |
| failed = "Exception" |
| passed = "Exception" |
| else: |
| failed = result.failures + result.errors |
| passed = result.run - failed |
| |
| total_pass += passed |
| total_fail += failed |
| total_skip += result.skipped |
| |
| total_time += result.time |
| |
| time = '%.2f' % result.time |
| |
| table.add_row([test, colored(passed, 'green'), colored(failed, 'red'), \ |
| colored(result.skipped, 'cyan'), colored(time, 'yellow')]) |
| |
| total_time = '%.2f' % total_time |
| |
| table.add_row(['Total', colored(total_pass, 'green'), colored(total_fail, 'red'), \ |
| colored(total_skip, 'cyan'), colored(total_time, 'yellow')]) |
| |
| dbg(table) |
| |
| return total_fail == 0 |
| |
| def run_auto_tests(ctx, args): |
| tests = build_test_list(args) |
| |
| for test in tests: |
| copied = [] |
| try: |
| subtests = pre_test(ctx, test, copied) |
| |
| if len(subtests) < 1: |
| dbg("No tests to run") |
| sys.exit() |
| |
| rqueue = multiprocessing.Queue() |
| p = multiprocessing.Process(target=start_test, args=(ctx, subtests, rqueue)) |
| p.start() |
| # Rather than time each subtest we just time the total but |
| # multiply the default time by the number of tests being run. |
| p.join(int(args.timeout) * len(subtests)) |
| |
| if p.is_alive(): |
| # Timeout |
| p.terminate() |
| |
| ctx.results[os.path.basename(test)] = SimpleResult(run=0, |
| failures=0, errors=0, |
| skipped=0, time=int(args.timeout)) |
| else: |
| ctx.results[os.path.basename(test)] = rqueue.get() |
| |
| except Exception as ex: |
| dbg("%s threw an uncaught exception" % test) |
| traceback.print_exc(file=sys.__stdout__) |
| ctx.results[os.path.basename(test)] = SimpleResult(run=0, failures=0, |
| errors=0, skipped=0, time=0) |
| finally: |
| post_test(ctx, copied) |
| |
| def run_unit_tests(ctx, args): |
| os.chdir(args.testhome + '/unit') |
| units = build_unit_list(args) |
| |
| for u in units: |
| p = ctx.start_process([u]).wait() |
| if p.returncode != 0: |
| dbg("Unit test %s failed" % os.path.basename(u)) |
| else: |
| dbg("Unit test %s passed" % os.path.basename(u)) |
| |
| def run_tests(args): |
| global config |
| |
| os.chdir(args.testhome) |
| |
| # |
| # This allows all autotest utils (iwd/hostapd/etc) to access the |
| # TestContext. Any other module or script (in the same interpreter) can |
| # simply import config.ctx and access all live test information, |
| # start/stop processes, see active radios etc. |
| # |
| config = importlib.import_module('config') |
| config.ctx = TestContext(args) |
| |
| # Must import these after config so ctx gets set |
| config.hwsim = importlib.import_module('hwsim') |
| config.hostapd = importlib.import_module('hostapd') |
| |
| # Start writing out kernel log |
| config.ctx.start_process(["dmesg", '--follow']) |
| |
| if args.unit_tests is None: |
| run_auto_tests(config.ctx, args) |
| else: |
| run_unit_tests(config.ctx, args) |
| |
| runner = Runner() |
| |
| atexit.register(exit_vm) |
| runner.prepare_environment() |
| |
| if runner.args.start: |
| ctx = TestContext(runner.args) |
| ctx.start_dbus() |
| os.chdir(runner.args.testhome) |
| os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = ctx.dbus_address |
| |
| subprocess.run([runner.args.start]) |
| else: |
| run_tests(runner.args) |
| |
| runner.cleanup_environment() |