| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: LGPL-2.1-or-later |
| |
| ################################################################### |
| # |
| # This is a unified test sample for BT Mesh |
| # |
| # To run the test: |
| # test-mesh [token] |
| # |
| # 'token' is an optional argument. It must be a 16-digit |
| # hexadecimal number. The token must be associated with |
| # an existing node. The token is generated and assigned |
| # to a node as a result of successful provisioning (see |
| # explanation of "join" option). |
| # When the token is set, the menu operations "attach" |
| # and "remove" may be performed on a node specified |
| # by this token. |
| # |
| # The test imitates a device with 2 elements: |
| # element 0: OnOff Server model |
| # Sample Vendor model |
| # element 1: OnOff Client model |
| # |
| # The main menu: |
| # token |
| # join |
| # attach |
| # remove |
| # dest |
| # uuid |
| # app-index |
| # client-menu |
| # exit |
| # |
| # The main menu options explained: |
| # token |
| # Set the unique node token. |
| # The token can be set from command line arguments as |
| # well. |
| # |
| # join |
| # Request provisioning of a device to become a node |
| # on a mesh network. The test generates device UUID |
| # which is displayed and will need to be provided to |
| # an outside entity that acts as a Provisioner. Also, |
| # during the provisioning process, an agent that is |
| # part of the test, will request (or will be requested) |
| # to perform a specified operation, e.g., a number will |
| # be displayed and this number will need to be entered |
| # on the Provisioner's side. |
| # In case of successful provisioning, the application |
| # automatically attaches as a node to the daemon. A node |
| # 'token' is returned to the application and is used |
| # for the runtime of the test. |
| # |
| # attach |
| # Attach the application to bluetoothd-daemon as a node. |
| # For the call to be successful, the valid node token must |
| # be already set, either from command arguments or by |
| # executing "set token" operation or automatically after |
| # successfully executing "join" operation in the same |
| # test run. |
| # |
| # remove |
| # Permanently removes any node configuration from daemon |
| # and persistent storage. After this operation, the node |
| # is permanently forgotten by the daemon and the associated |
| # node token is no longer valid. |
| # |
| # dest |
| # Set destination address to send messages: 4 hex digits |
| # |
| # app-index |
| # Set AppKey index to indicate which application key to use |
| # to encode outgoing messages: up to 3 hex digits |
| # |
| # vendor-send |
| # Allows to send an arbitrary endor message. |
| # The destination is set based on previously executed "dest" |
| # command (if not set, the outbound message will fail). |
| # User is prompted to enter hex bytearray payload. |
| # The message is originated from the vendor model registered |
| # on element 0. For the command to succeed, the AppKey index |
| # that is set by executing "app-key" must correspond to the |
| # application key to which the Sample Vendor model is bound. |
| # |
| # client-menu |
| # Enter On/Off client submenu. |
| # |
| # quit |
| # Exits the test. |
| # |
| ################################################################### |
| import sys |
| import struct |
| import fcntl |
| import os |
| import numpy |
| import random |
| import dbus |
| import dbus.service |
| import dbus.exceptions |
| |
| from threading import Timer |
| import time |
| import uuid |
| |
| try: |
| from gi.repository import GLib |
| except ImportError: |
| import glib as GLib |
| from dbus.mainloop.glib import DBusGMainLoop |
| |
| try: |
| from termcolor import colored, cprint |
| set_error = lambda x: colored('!' + x, 'red', attrs=['bold']) |
| set_cyan = lambda x: colored(x, 'cyan', attrs=['bold']) |
| set_green = lambda x: colored(x, 'green', attrs=['bold']) |
| set_yellow = lambda x: colored(x, 'yellow', attrs=['bold']) |
| except ImportError: |
| print('!!! Install termcolor module for better experience !!!') |
| set_error = lambda x: x |
| set_cyan = lambda x: x |
| set_green = lambda x: x |
| set_yellow = lambda x: x |
| |
| # Provisioning agent |
| try: |
| import agent |
| except ImportError: |
| agent = None |
| |
| MESH_SERVICE_NAME = 'org.bluez.mesh' |
| DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' |
| DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' |
| |
| MESH_MGR_IFACE = 'org.bluez.mesh.Management1' |
| MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1' |
| MESH_NODE_IFACE = 'org.bluez.mesh.Node1' |
| MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1' |
| MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1' |
| |
| APP_COMPANY_ID = 0x05f1 |
| APP_PRODUCT_ID = 0x0001 |
| APP_VERSION_ID = 0x0001 |
| |
| VENDOR_ID_NONE = 0xffff |
| |
| TRANSACTION_TIMEOUT = 6 |
| |
| app = None |
| bus = None |
| mainloop = None |
| node = None |
| node_mgr = None |
| mesh_net = None |
| |
| dst_addr = 0x0000 |
| app_idx = 0 |
| |
| # Node token housekeeping |
| token = None |
| have_token = False |
| attached = False |
| |
| # Remote device UUID |
| have_uuid = False |
| remote_uuid = None |
| |
| # Menu housekeeping |
| MAIN_MENU = 0 |
| ON_OFF_CLIENT_MENU = 1 |
| |
| INPUT_NONE = 0 |
| INPUT_TOKEN = 1 |
| INPUT_DEST_ADDRESS = 2 |
| INPUT_APP_KEY_INDEX = 3 |
| INPUT_MESSAGE_PAYLOAD = 4 |
| INPUT_UUID = 5 |
| |
| menus = [] |
| current_menu = None |
| |
| user_input = 0 |
| input_error = False |
| |
| send_opts = dbus.Dictionary(signature='sv') |
| send_opts = {'ForceSegmented' : dbus.Boolean(True)} |
| |
| def raise_error(str_value): |
| global input_error |
| |
| input_error = True |
| print(set_error(str_value)) |
| |
| def clear_error(): |
| global input_error |
| input_error = False |
| |
| def is_error(): |
| return input_error |
| |
| def app_exit(): |
| global mainloop |
| global app |
| |
| for el in app.elements: |
| for model in el.models: |
| if model.timer != None: |
| model.timer.cancel() |
| mainloop.quit() |
| |
| def set_token(str_value): |
| global token |
| global have_token |
| |
| if len(str_value) != 16: |
| raise_error('Expected 16 digits') |
| return |
| |
| try: |
| input_number = int(str_value, 16) |
| except ValueError: |
| raise_error('Not a valid hexadecimal number') |
| return |
| |
| token = numpy.uint64(input_number) |
| have_token = True |
| |
| def set_uuid(str_value): |
| global remote_uuid |
| global have_uuid |
| |
| if len(str_value) != 32: |
| raise_error('Expected 32 digits') |
| return |
| |
| remote_uuid = bytearray.fromhex(str_value) |
| have_uuid = True |
| |
| def array_to_string(b_array): |
| str_value = "" |
| for b in b_array: |
| str_value += "%02x" % b |
| return str_value |
| |
| def generic_error_cb(error): |
| print(set_error('D-Bus call failed: ') + str(error)) |
| |
| def generic_reply_cb(): |
| return |
| |
| def attach_app_error_cb(error): |
| print(set_error('Failed to register application: ') + str(error)) |
| |
| def attach(token): |
| print('Attach mesh node to bluetooth-meshd daemon') |
| |
| mesh_net.Attach(app.get_path(), token, |
| reply_handler=attach_app_cb, |
| error_handler=attach_app_error_cb) |
| |
| def join_cb(): |
| print('Join procedure started') |
| |
| def join_error_cb(reason): |
| print('Join procedure failed: ', reason) |
| |
| def remove_node_cb(): |
| global attached |
| global have_token |
| |
| print(set_yellow('Node removed')) |
| attached = False |
| have_token = False |
| |
| def unwrap(item): |
| if isinstance(item, dbus.Boolean): |
| return bool(item) |
| if isinstance(item, (dbus.UInt16, dbus.Int16, dbus.UInt32, dbus.Int32, |
| dbus.UInt64, dbus.Int64)): |
| return int(item) |
| if isinstance(item, dbus.Byte): |
| return bytes([int(item)]) |
| if isinstance(item, dbus.String): |
| return item |
| if isinstance(item, (dbus.Array, list, tuple)): |
| return [unwrap(x) for x in item] |
| if isinstance(item, (dbus.Dictionary, dict)): |
| return dict([(unwrap(x), unwrap(y)) for x, y in item.items()]) |
| |
| print(set_error('Dictionary item not handled: ') + type(item)) |
| |
| return item |
| |
| def attach_app_cb(node_path, dict_array): |
| global attached |
| |
| attached = True |
| |
| print(set_yellow('Mesh app registered: ') + set_green(node_path)) |
| |
| obj = bus.get_object(MESH_SERVICE_NAME, node_path) |
| |
| global node_mgr |
| node_mgr = dbus.Interface(obj, MESH_MGR_IFACE) |
| |
| global node |
| node = dbus.Interface(obj, MESH_NODE_IFACE) |
| |
| els = unwrap(dict_array) |
| |
| for el in els: |
| idx = struct.unpack('b', el[0])[0] |
| |
| models = el[1] |
| element = app.get_element(idx) |
| element.set_model_config(models) |
| |
| def interfaces_removed_cb(object_path, interfaces): |
| print('Removed') |
| if not mesh_net: |
| return |
| |
| print(object_path) |
| if object_path == mesh_net[2]: |
| print('Service was removed') |
| app_exit() |
| |
| def print_state(state): |
| print('State is ', end='') |
| if state == 0: |
| print('OFF') |
| elif state == 1: |
| print('ON') |
| else: |
| print('UNKNOWN') |
| class ModTimer(): |
| def __init__(self): |
| self.seconds = None |
| self.func = None |
| self.thread = None |
| self.busy = False |
| |
| def _timeout_cb(self): |
| self.func() |
| self.busy = True |
| self._schedule_timer() |
| self.busy =False |
| |
| def _schedule_timer(self): |
| self.thread = Timer(self.seconds, self._timeout_cb) |
| self.thread.start() |
| |
| def start(self, seconds, func): |
| self.func = func |
| self.seconds = seconds |
| if not self.busy: |
| self._schedule_timer() |
| |
| def cancel(self): |
| if self.thread is not None: |
| self.thread.cancel() |
| self.thread = None |
| |
| class Application(dbus.service.Object): |
| |
| def __init__(self, bus): |
| self.path = '/example' |
| self.agent = None |
| self.elements = [] |
| dbus.service.Object.__init__(self, bus, self.path) |
| |
| def set_agent(self, agent): |
| self.agent = agent |
| |
| def get_path(self): |
| return dbus.ObjectPath(self.path) |
| |
| def add_element(self, element): |
| self.elements.append(element) |
| |
| def get_element(self, idx): |
| for ele in self.elements: |
| if ele.get_index() == idx: |
| return ele |
| |
| def get_properties(self): |
| return { |
| MESH_APPLICATION_IFACE: { |
| 'CompanyID': dbus.UInt16(APP_COMPANY_ID), |
| 'ProductID': dbus.UInt16(APP_PRODUCT_ID), |
| 'VersionID': dbus.UInt16(APP_VERSION_ID) |
| } |
| } |
| |
| @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') |
| def GetManagedObjects(self): |
| response = {} |
| response[self.path] = self.get_properties() |
| response[self.agent.get_path()] = self.agent.get_properties() |
| for element in self.elements: |
| response[element.get_path()] = element.get_properties() |
| return response |
| |
| @dbus.service.method(MESH_APPLICATION_IFACE, |
| in_signature="t", out_signature="") |
| def JoinComplete(self, value): |
| global token |
| global have_token |
| global attach |
| |
| print(set_yellow('Joined mesh network with token ') + |
| set_green(format(value, '016x'))) |
| |
| token = value |
| have_token = True |
| |
| @dbus.service.method(MESH_APPLICATION_IFACE, |
| in_signature="s", out_signature="") |
| def JoinFailed(self, value): |
| print(set_error('JoinFailed '), value) |
| |
| |
| class Element(dbus.service.Object): |
| PATH_BASE = '/example/ele' |
| |
| def __init__(self, bus, index): |
| self.path = self.PATH_BASE + format(index, '02x') |
| self.models = [] |
| self.bus = bus |
| self.index = index |
| dbus.service.Object.__init__(self, bus, self.path) |
| |
| def _get_sig_models(self): |
| mods = [] |
| for model in self.models: |
| opts = [] |
| id = model.get_id() |
| vendor = model.get_vendor() |
| if vendor == VENDOR_ID_NONE: |
| mod = (id, opts) |
| mods.append(mod) |
| return mods |
| |
| def _get_v_models(self): |
| mods = [] |
| for model in self.models: |
| opts = [] |
| id = model.get_id() |
| v = model.get_vendor() |
| if v != VENDOR_ID_NONE: |
| mod = (v, id, opts) |
| mods.append(mod) |
| return mods |
| |
| def get_properties(self): |
| vendor_models = self._get_v_models() |
| sig_models = self._get_sig_models() |
| |
| props = {'Index' : dbus.Byte(self.index)} |
| props['Models'] = dbus.Array(sig_models, signature='(qa{sv})') |
| props['VendorModels'] = dbus.Array(vendor_models, |
| signature='(qqa{sv})') |
| #print(props) |
| return { MESH_ELEMENT_IFACE: props } |
| |
| def add_model(self, model): |
| model.set_path(self.path) |
| self.models.append(model) |
| |
| def get_index(self): |
| return self.index |
| |
| def set_model_config(self, configs): |
| for config in configs: |
| mod_id = config[0] |
| self.update_model_config(mod_id, config[1]) |
| |
| @dbus.service.method(MESH_ELEMENT_IFACE, |
| in_signature="qqvay", out_signature="") |
| def MessageReceived(self, source, key, dest, data): |
| print(('Message Received on Element %02x') % self.index, end='') |
| print(', src=', format(source, '04x'), end='') |
| |
| if isinstance(dest, int): |
| print(', dst=%04x' % dest) |
| elif isinstance(dest, dbus.Array): |
| dst_str = array_to_string(dest) |
| print(', dst=' + dst_str) |
| |
| for model in self.models: |
| model.process_message(source, dest, key, data) |
| |
| @dbus.service.method(MESH_ELEMENT_IFACE, |
| in_signature="qa{sv}", out_signature="") |
| |
| def UpdateModelConfiguration(self, model_id, config): |
| cfg = unwrap(config) |
| print(cfg) |
| self.update_model_config(model_id, cfg) |
| |
| def update_model_config(self, model_id, config): |
| print(('Update Model Config '), end='') |
| print(format(model_id, '04x')) |
| for model in self.models: |
| if model_id == model.get_id(): |
| model.set_config(config) |
| return |
| |
| @dbus.service.method(MESH_ELEMENT_IFACE, |
| in_signature="", out_signature="") |
| |
| def get_path(self): |
| return dbus.ObjectPath(self.path) |
| |
| class Model(): |
| def __init__(self, model_id): |
| self.cmd_ops = [] |
| self.model_id = model_id |
| self.vendor = VENDOR_ID_NONE |
| self.bindings = [] |
| self.pub_period = 0 |
| self.pub_id = 0 |
| self.path = None |
| self.timer = None |
| |
| def set_path(self, path): |
| self.path = path |
| |
| def get_id(self): |
| return self.model_id |
| |
| def get_vendor(self): |
| return self.vendor |
| |
| def process_message(self, source, dest, key, data): |
| return |
| |
| def set_publication(self, period): |
| self.pub_period = period |
| |
| def send_publication(self, data): |
| pub_opts = dbus.Dictionary(signature='sv') |
| |
| print('Send publication ', end='') |
| print(data) |
| node.Publish(self.path, self.model_id, pub_opts, data, |
| reply_handler=generic_reply_cb, |
| error_handler=generic_error_cb) |
| |
| def send_message(self, dest, key, data): |
| global send_opts |
| |
| node.Send(self.path, dest, key, send_opts, data, |
| reply_handler=generic_reply_cb, |
| error_handler=generic_error_cb) |
| |
| def set_config(self, config): |
| if 'Bindings' in config: |
| self.bindings = config.get('Bindings') |
| print('Bindings: ', end='') |
| print(self.bindings) |
| if 'PublicationPeriod' in config: |
| self.set_publication(config.get('PublicationPeriod')) |
| print('Model publication period ', end='') |
| print(self.pub_period, end='') |
| print(' ms') |
| if 'Subscriptions' in config: |
| print('Model subscriptions ', end='') |
| self.print_subscriptions(config.get('Subscriptions')) |
| print() |
| |
| def print_subscriptions(self, subscriptions): |
| for sub in subscriptions: |
| if isinstance(sub, int): |
| print('%04x,' % sub, end=' ') |
| |
| if isinstance(sub, list): |
| label = uuid.UUID(bytes=b''.join(sub)) |
| print(label, ',', end=' ') |
| |
| ######################## |
| # On Off Server Model |
| ######################## |
| class OnOffServer(Model): |
| def __init__(self, model_id): |
| Model.__init__(self, model_id) |
| self.tid = None |
| self.last_src = 0x0000 |
| self.last_dst = 0x0000 |
| self.cmd_ops = { 0x8201, # get |
| 0x8202, # set |
| 0x8203, # set unacknowledged |
| 0x8204 } # status |
| |
| print("OnOff Server ") |
| self.state = 0 |
| print_state(self.state) |
| self.pub_timer = ModTimer() |
| self.t_timer = ModTimer() |
| |
| def process_message(self, source, dest, key, data): |
| datalen = len(data) |
| |
| if datalen != 2 and datalen != 4: |
| # The opcode is not recognized by this model |
| return |
| |
| if datalen == 2: |
| op_tuple=struct.unpack('>H',bytes(data)) |
| opcode = op_tuple[0] |
| |
| if opcode != 0x8201: |
| # The opcode is not recognized by this model |
| return |
| print('Get state') |
| elif datalen == 4: |
| opcode,self.state, tid = struct.unpack('>HBB', |
| bytes(data)) |
| |
| if opcode != 0x8202 and opcode != 0x8203: |
| # The opcode is not recognized by this model |
| return |
| print_state(self.state) |
| |
| if (self.tid != None and self.tid == tid and |
| self.last_src == source and |
| self.last_dst == dest): |
| # Ignore duplicate transaction |
| return |
| |
| self.t_timer.cancel() |
| self.tid = tid |
| self.last_src = source |
| self.last_dst = dest |
| self.t_timer.start(TRANSACTION_TIMEOUT, self.t_track) |
| |
| # Unacknowledged "set" |
| if opcode == 0x8203: |
| return |
| |
| rsp_data = struct.pack('>HB', 0x8204, self.state) |
| self.send_message(source, key, rsp_data) |
| |
| def t_track(self): |
| self.t_timer.cancel() |
| self.tid = None |
| self.last_src = 0x0000 |
| self.last_dst = 0x0000 |
| |
| def set_publication(self, period): |
| |
| self.pub_period = period |
| if period == 0: |
| self.pub_timer.cancel() |
| return |
| |
| # We do not handle ms in this example |
| if period < 1000: |
| return |
| |
| self.pub_timer.start(period/1000, self.publish) |
| |
| def publish(self): |
| print('Publish') |
| data = struct.pack('>HB', 0x8204, self.state) |
| self.send_publication(data) |
| |
| ######################## |
| # On Off Client Model |
| ######################## |
| class OnOffClient(Model): |
| def __init__(self, model_id): |
| Model.__init__(self, model_id) |
| self.tid = 0 |
| self.data = None |
| self.cmd_ops = { 0x8201, # get |
| 0x8202, # set |
| 0x8203, # set unacknowledged |
| 0x8204 } # status |
| print('OnOff Client') |
| |
| def _send_message(self, dest, key, data): |
| print('OnOffClient send command') |
| self.send_message(dest, key, data) |
| |
| def get_state(self, dest, key): |
| opcode = 0x8201 |
| self.data = struct.pack('>H', opcode) |
| self._send_message(dest, key, self.data) |
| |
| def set_state(self, dest, key, state): |
| opcode = 0x8202 |
| print('Set state:', state) |
| self.data = struct.pack('>HBB', opcode, state, self.tid) |
| self.tid = (self.tid + 1) % 255 |
| self._send_message(dest, key, self.data) |
| |
| def repeat(self, dest, key): |
| if self.data != None: |
| self._send_message(dest, key, self.data) |
| else: |
| print('No previous command stored') |
| |
| def process_message(self, source, dest, key, data): |
| print('OnOffClient process message len = ', end = '') |
| datalen = len(data) |
| print(datalen) |
| |
| if datalen != 3: |
| # The opcode is not recognized by this model |
| return |
| |
| opcode, state = struct.unpack('>HB',bytes(data)) |
| |
| if opcode != 0x8204 : |
| # The opcode is not recognized by this model |
| return |
| |
| print(set_yellow('Got state '), end = '') |
| |
| state_str = "ON" |
| if state == 0: |
| state_str = "OFF" |
| |
| print(set_green(state_str), set_yellow('from'), |
| set_green('%04x' % source)) |
| |
| ######################## |
| # Sample Vendor Model |
| ######################## |
| class SampleVendor(Model): |
| def __init__(self, model_id): |
| Model.__init__(self, model_id) |
| self.vendor = 0x05F1 # Linux Foundation Company ID |
| |
| ######################## |
| # Menu functions |
| ######################## |
| class MenuItem(): |
| def __init__(self, desc, func): |
| self.desc = desc |
| self.func = func |
| |
| class Menu(): |
| def __init__(self, title, menu): |
| self.title = title |
| self.menu = menu |
| |
| def show(self): |
| print(set_cyan('*** ' + self.title.upper() + ' ***')) |
| for k, v in self.menu.items(): |
| print(set_green(k), set_cyan(v.desc)) |
| |
| def process_cmd(self, str_value): |
| if is_error(): |
| self.show() |
| clear_error() |
| return |
| |
| cmds = [] |
| for key in self.menu.keys(): |
| if key.startswith(str_value): |
| cmds.append(key) |
| |
| if len(cmds) == 0: |
| print(set_error('Unknown menu option: '), str_value) |
| self.show() |
| return |
| if len(cmds) > 1: |
| for cmd in cmds: |
| print(set_cyan(cmd + '?')) |
| return |
| |
| self.menu.get(cmds[0]).func() |
| |
| class MenuHandler(object): |
| def __init__(self, callback): |
| self.cb = callback |
| flags = fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) |
| flags |= os.O_NONBLOCK |
| fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, flags) |
| sys.stdin.flush() |
| GLib.io_add_watch(sys.stdin, GLib.IO_IN, self.input_callback) |
| |
| def input_callback(self, fd, condition): |
| chunk = fd.read() |
| buffer = '' |
| for char in chunk: |
| buffer += char |
| if char == '\n': |
| self.cb(buffer) |
| |
| return True |
| |
| def process_input(input_str): |
| str_value = input_str.strip() |
| |
| # Allow entering empty lines for better output visibility |
| if len(str_value) == 0: |
| return |
| |
| current_menu.process_cmd(str_value) |
| |
| def switch_menu(level): |
| global current_menu |
| |
| if level >= len(menus): |
| return |
| |
| current_menu = menus[level] |
| current_menu.show() |
| |
| ######################## |
| # Main menu class |
| ######################## |
| class MainMenu(Menu): |
| def __init__(self): |
| menu_items = { |
| 'token': MenuItem(' - set node ID (token)', |
| self.__cmd_set_token), |
| 'join': MenuItem(' - join mesh network', |
| self.__cmd_join), |
| 'attach': MenuItem(' - attach mesh node', |
| self.__cmd_attach), |
| 'remove': MenuItem(' - delete node', |
| self.__cmd_remove), |
| 'dest': MenuItem(' - set destination address', |
| self.__cmd_set_dest), |
| 'uuid': MenuItem(' - set remote uuid', |
| self.__cmd_set_uuid), |
| 'app-index': MenuItem(' - set AppKey index', |
| self.__cmd_set_app_idx), |
| 'vendor-send': MenuItem(' - send raw vendor message', |
| self.__cmd_vendor_msg), |
| 'client-menu': MenuItem(' - On/Off client menu', |
| self.__cmd_client_menu), |
| 'quit': MenuItem(' - exit the test', app_exit) |
| } |
| |
| Menu.__init__(self, 'Main Menu', menu_items) |
| |
| def __cmd_client_menu(self): |
| if attached != True: |
| print(set_error('Disallowed: node is not attached')) |
| return |
| switch_menu(ON_OFF_CLIENT_MENU) |
| |
| def __cmd_set_token(self): |
| global user_input |
| |
| if have_token == True: |
| print('Token already set') |
| return |
| |
| user_input = INPUT_TOKEN |
| print(set_cyan('Enter 16-digit hex node ID:')) |
| |
| def __cmd_set_dest(self): |
| global user_input |
| |
| user_input = INPUT_DEST_ADDRESS |
| print(set_cyan('Enter 4-digit hex destination address:')) |
| |
| def __cmd_set_uuid(self): |
| global user_input |
| |
| user_input = INPUT_UUID |
| print(set_cyan('Enter 32-digit hex remote UUID:')) |
| |
| def __cmd_set_app_idx(self): |
| global user_input |
| |
| user_input = INPUT_APP_KEY_INDEX; |
| print(set_cyan('Enter app key index (up to 3 digit hex):')) |
| |
| def __cmd_vendor_msg(self): |
| global user_input |
| |
| user_input = INPUT_MESSAGE_PAYLOAD; |
| print(set_cyan('Enter message payload (hex):')) |
| |
| def __cmd_join(self): |
| if agent == None: |
| print(set_error('Provisioning agent not found')) |
| return |
| |
| uuid_bytes = uuid.uuid4().bytes |
| uuid_str = array_to_string(uuid_bytes) |
| |
| print(set_yellow('Joining with UUID ') + set_green(uuid_str)) |
| mesh_net.Join(app.get_path(), uuid_bytes, |
| reply_handler=join_cb, |
| error_handler=join_error_cb) |
| |
| def __cmd_attach(self): |
| if have_token == False: |
| print(set_error('Token is not set')) |
| self.show() |
| return |
| |
| attach(token) |
| |
| def __cmd_remove(self): |
| if have_token == False: |
| print(set_error('Token is not set')) |
| self.show() |
| return |
| |
| print('Removing mesh node') |
| mesh_net.Leave(token, reply_handler=remove_node_cb, |
| error_handler=generic_error_cb) |
| |
| def __send_vendor_msg(self, str_value): |
| try: |
| msg_data = bytearray.fromhex(str_value) |
| except ValueError: |
| raise_error('Not a valid hexadecimal input') |
| return |
| |
| print(set_yellow('Send data: ' + set_green(str_value))) |
| app.elements[0].models[1].send_message(dst_addr, app_idx, |
| msg_data) |
| |
| def process_cmd(self, str_value): |
| global user_input |
| global dst_addr |
| global app_idx |
| |
| if user_input == INPUT_TOKEN: |
| set_token(str_value) |
| elif user_input == INPUT_UUID: |
| set_uuid(str_value) |
| elif user_input == INPUT_DEST_ADDRESS: |
| res = set_value(str_value, 4, 4) |
| if is_error() != True: |
| dst_addr = res |
| print(set_yellow("Destination address: ") + |
| set_green(format(dst_addr, '04x'))) |
| elif user_input == INPUT_APP_KEY_INDEX: |
| res = set_value(str_value, 1, 3) |
| if is_error() != True: |
| app_idx = res |
| print(set_yellow("Application index: ") + |
| set_green(format(app_idx, '03x'))) |
| elif user_input == INPUT_MESSAGE_PAYLOAD: |
| self.__send_vendor_msg(str_value) |
| |
| if user_input != INPUT_NONE: |
| user_input = INPUT_NONE |
| if is_error() != True: |
| return |
| |
| Menu.process_cmd(self, str_value) |
| |
| ############################## |
| # On/Off Client menu class |
| ############################## |
| class ClientMenu(Menu): |
| def __init__(self): |
| menu_items = { |
| 'get-state': MenuItem(' - get server state', |
| self.__cmd_get_state), |
| 'off': MenuItem(' - set state OFF', |
| self.__cmd_set_state_off), |
| 'on': MenuItem(' - set state ON', |
| self.__cmd_set_state_on), |
| 'repeat': MenuItem(' - repeat last command', |
| self.__cmd_repeat_transaction), |
| 'back': MenuItem(' - back to main menu', |
| self.__cmd_main_menu), |
| 'quit': MenuItem(' - exit the test', app_exit) |
| } |
| |
| Menu.__init__(self, 'On/Off Client Menu', menu_items) |
| |
| def __cmd_main_menu(self): |
| switch_menu(MAIN_MENU) |
| |
| def __cmd_get_state(self): |
| app.elements[1].models[0].get_state(dst_addr, app_idx) |
| |
| def __cmd_set_state_off(self): |
| app.elements[1].models[0].set_state(dst_addr, app_idx, 0) |
| |
| def __cmd_set_state_on(self): |
| app.elements[1].models[0].set_state(dst_addr, app_idx, 1) |
| |
| def __cmd_repeat_transaction(self): |
| app.elements[1].models[0].repeat(dst_addr, app_idx) |
| |
| def set_value(str_value, min, max): |
| |
| if len(str_value) > max or len(str_value) < min: |
| raise_error('Bad input length %d' % len(str_value)) |
| return -1 |
| |
| try: |
| value = int(str_value, 16) |
| except ValueError: |
| raise_error('Not a valid hexadecimal number') |
| return -1 |
| |
| return value |
| |
| |
| ######################## |
| # Main entry |
| ######################## |
| def main(): |
| |
| DBusGMainLoop(set_as_default=True) |
| |
| global bus |
| bus = dbus.SystemBus() |
| global mainloop |
| global app |
| global mesh_net |
| global menu |
| global current_menu |
| |
| if len(sys.argv) > 1 : |
| set_token(sys.argv[1]) |
| |
| mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME, |
| "/org/bluez/mesh"), |
| MESH_NETWORK_IFACE) |
| |
| mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb) |
| |
| app = Application(bus) |
| |
| # Provisioning agent |
| if agent != None: |
| app.set_agent(agent.Agent(bus)) |
| |
| first_ele = Element(bus, 0x00) |
| second_ele = Element(bus, 0x01) |
| |
| print(set_yellow('Register OnOff Server model on element 0')) |
| first_ele.add_model(OnOffServer(0x1000)) |
| |
| print(set_yellow('Register Vendor model on element 0')) |
| first_ele.add_model(SampleVendor(0x0001)) |
| |
| print(set_yellow('Register OnOff Client model on element 1')) |
| second_ele.add_model(OnOffClient(0x1001)) |
| |
| app.add_element(first_ele) |
| app.add_element(second_ele) |
| |
| mainloop = GLib.MainLoop() |
| |
| menus.append(MainMenu()) |
| menus.append(ClientMenu()) |
| switch_menu(MAIN_MENU) |
| |
| event_catcher = MenuHandler(process_input); |
| mainloop.run() |
| |
| if __name__ == '__main__': |
| main() |