| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: LGPL-2.1-or-later |
| |
| import sys |
| import struct |
| import numpy |
| import dbus |
| import dbus.service |
| import dbus.exceptions |
| |
| from threading import Timer |
| import time |
| |
| try: |
| from gi.repository import GObject |
| except ImportError: |
| import gobject as GObject |
| from dbus.mainloop.glib import DBusGMainLoop |
| |
| import agent |
| |
| MESH_SERVICE_NAME = 'org.bluez.mesh' |
| DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' |
| DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' |
| |
| MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1' |
| MESH_NODE_IFACE = 'org.bluez.mesh.Node1' |
| MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1' |
| MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1' |
| |
| APP_COMPANY_ID = 0x05f1 |
| APP_PRODUCT_ID = 0x0001 |
| APP_VERSION_ID = 0x0001 |
| |
| VENDOR_ID_NONE = 0xffff |
| |
| mesh_net = None |
| app = None |
| bus = None |
| mainloop = None |
| node = None |
| |
| token = None |
| |
| def generic_error_cb(error): |
| print('D-Bus call failed: ' + str(error)) |
| |
| def generic_reply_cb(): |
| print('D-Bus call done') |
| |
| def unwrap(item): |
| if isinstance(item, dbus.Boolean): |
| return bool(item) |
| if isinstance(item, (dbus.UInt16, dbus.Int16, |
| dbus.UInt32, dbus.Int32, |
| dbus.UInt64, dbus.Int64)): |
| return int(item) |
| if isinstance(item, dbus.Byte): |
| return bytes([int(item)]) |
| if isinstance(item, dbus.String): |
| return item |
| if isinstance(item, (dbus.Array, list, tuple)): |
| return [unwrap(x) for x in item] |
| if isinstance(item, (dbus.Dictionary, dict)): |
| return dict([(unwrap(x), unwrap(y)) for x, y in item.items()]) |
| |
| print('Dictionary item not handled') |
| print(type(item)) |
| return item |
| |
| def join_cb(): |
| print('Join procedure started') |
| |
| def join_error_cb(reason): |
| print('Join procedure failed: ', reason) |
| |
| def attach_app_cb(node_path, dict_array): |
| print('Mesh application registered ', node_path) |
| |
| obj = bus.get_object(MESH_SERVICE_NAME, node_path) |
| |
| global node |
| node = dbus.Interface(obj, MESH_NODE_IFACE) |
| |
| els = unwrap(dict_array) |
| print("Get Elements") |
| |
| for el in els: |
| idx = struct.unpack('b', el[0])[0] |
| print('Configuration for Element ', end='') |
| print(idx) |
| |
| models = el[1] |
| element = app.get_element(idx) |
| element.set_model_config(models) |
| |
| def attach_app_error_cb(error): |
| print('Failed to register application: ' + str(error)) |
| mainloop.quit() |
| |
| def attach(token): |
| print('Attach') |
| mesh_net.Attach(app.get_path(), token, |
| reply_handler=attach_app_cb, |
| error_handler=attach_app_error_cb) |
| |
| def interfaces_removed_cb(object_path, interfaces): |
| if not mesh_net: |
| return |
| |
| if object_path == mesh_net[2]: |
| print('Service was removed') |
| mainloop.quit() |
| |
| def send_response(path, dest, key, data): |
| print('send response ', end='') |
| print(data) |
| node.Send(path, dest, key, data, reply_handler=generic_reply_cb, |
| error_handler=generic_error_cb) |
| |
| def send_publication(path, model_id, data): |
| print('send publication ', end='') |
| print(data) |
| node.Publish(path, model_id, data, reply_handler=generic_reply_cb, |
| error_handler=generic_error_cb) |
| |
| class PubTimer(): |
| def __init__(self): |
| self.seconds = None |
| self.func = None |
| self.thread = None |
| self.busy = False |
| |
| def _timeout_cb(self): |
| self.func() |
| self.busy = True |
| self._schedule_timer() |
| self.busy =False |
| |
| def _schedule_timer(self): |
| self.thread = Timer(self.seconds, self._timeout_cb) |
| self.thread.start() |
| |
| def start(self, seconds, func): |
| self.func = func |
| self.seconds = seconds |
| if not self.busy: |
| self._schedule_timer() |
| |
| def cancel(self): |
| print('Cancel timer') |
| if self.thread is not None: |
| print('Cancel thread') |
| self.thread.cancel() |
| self.thread = None |
| |
| class Application(dbus.service.Object): |
| |
| def __init__(self, bus): |
| self.path = '/example' |
| self.agent = None |
| self.elements = [] |
| dbus.service.Object.__init__(self, bus, self.path) |
| |
| def set_agent(self, agent): |
| self.agent = agent |
| |
| def get_path(self): |
| return dbus.ObjectPath(self.path) |
| |
| def add_element(self, element): |
| self.elements.append(element) |
| |
| def get_element(self, idx): |
| for ele in self.elements: |
| if ele.get_index() == idx: |
| return ele |
| |
| def get_properties(self): |
| return { |
| MESH_APPLICATION_IFACE: { |
| 'CompanyID': dbus.UInt16(APP_COMPANY_ID), |
| 'ProductID': dbus.UInt16(APP_PRODUCT_ID), |
| 'VersionID': dbus.UInt16(APP_VERSION_ID) |
| } |
| } |
| |
| @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') |
| |
| def GetManagedObjects(self): |
| response = {} |
| print('GetManagedObjects') |
| response[self.path] = self.get_properties() |
| response[self.agent.get_path()] = self.agent.get_properties() |
| for element in self.elements: |
| response[element.get_path()] = element.get_properties() |
| return response |
| |
| @dbus.service.method(MESH_APPLICATION_IFACE, |
| in_signature="t", out_signature="") |
| |
| def JoinComplete(self, value): |
| global token |
| print('JoinComplete ', value) |
| |
| token = value |
| attach(token) |
| |
| @dbus.service.method(MESH_APPLICATION_IFACE, |
| in_signature="s", out_signature="") |
| |
| def JoinFailed(self, value): |
| print('JoinFailed ', value) |
| token = value |
| |
| class Element(dbus.service.Object): |
| PATH_BASE = '/example/ele' |
| |
| def __init__(self, bus, index): |
| self.path = self.PATH_BASE + format(index, '02x') |
| print(self.path) |
| self.models = [] |
| self.bus = bus |
| self.index = index |
| dbus.service.Object.__init__(self, bus, self.path) |
| |
| def _get_sig_models(self): |
| ids = [] |
| for model in self.models: |
| id = model.get_id() |
| vendor = model.get_vendor() |
| if vendor == VENDOR_ID_NONE: |
| ids.append(id) |
| return ids |
| |
| def _get_v_models(self): |
| ids = [] |
| for model in self.models: |
| id = model.get_id() |
| v = model.get_vendor() |
| if v != VENDOR_ID_NONE: |
| vendor_id = (v, id) |
| ids.append(vendor_id) |
| return ids |
| |
| def get_properties(self): |
| vendor_models = self._get_v_models() |
| sig_models = self._get_sig_models() |
| |
| return { |
| MESH_ELEMENT_IFACE: { |
| 'Index': dbus.Byte(self.index), |
| 'Models': dbus.Array(sig_models, 'q'), |
| 'VendorModels': dbus.Array(vendor_models, '(qq)'), |
| } |
| } |
| |
| def add_model(self, model): |
| model.set_path(self.path) |
| self.models.append(model) |
| |
| def get_index(self): |
| return self.index |
| |
| def set_model_config(self, configs): |
| print('Set element models config') |
| for config in configs: |
| mod_id = config[0] |
| self.UpdateModelConfiguration(mod_id, config[1]) |
| |
| @dbus.service.method(MESH_ELEMENT_IFACE, |
| in_signature="qqvay", out_signature="") |
| def MessageReceived(self, source, key, destination, data): |
| print('Message Received on Element %d, src=%04x, dst=%s' % |
| self.index, source, destination) |
| for model in self.models: |
| model.process_message(source, key, data) |
| |
| @dbus.service.method(MESH_ELEMENT_IFACE, |
| in_signature="qa{sv}", out_signature="") |
| |
| def UpdateModelConfiguration(self, model_id, config): |
| print('UpdateModelConfig ', end='') |
| print(hex(model_id)) |
| for model in self.models: |
| if model_id == model.get_id(): |
| model.set_config(config) |
| return |
| |
| @dbus.service.method(MESH_ELEMENT_IFACE, |
| in_signature="", out_signature="") |
| |
| def get_path(self): |
| return dbus.ObjectPath(self.path) |
| |
| class Model(): |
| def __init__(self, model_id): |
| self.cmd_ops = [] |
| self.model_id = model_id |
| self.vendor = VENDOR_ID_NONE |
| self.bindings = [] |
| self.pub_period = 0 |
| self.pub_id = 0 |
| self.path = None |
| |
| def set_path(self, path): |
| self.path = path |
| |
| def get_id(self): |
| return self.model_id |
| |
| def get_vendor(self): |
| return self.vendor |
| |
| def process_message(self, source, key, data): |
| print('Model process message') |
| |
| def set_publication(self, period): |
| self.pub_period = period |
| |
| def set_config(self, config): |
| if 'Bindings' in config: |
| self.bindings = config.get('Bindings') |
| print('Bindings: ', end='') |
| print(self.bindings) |
| if 'PublicationPeriod' in config: |
| self.set_publication(config.get('PublicationPeriod')) |
| print('Model publication period ', end='') |
| print(self.pub_period, end='') |
| print(' ms') |
| if 'Subscriptions' in config: |
| self.print_subscriptions(config.get('Subscriptions')) |
| |
| def print_subscriptions(self, subscriptions): |
| print('Model subscriptions ', end='') |
| for sub in subscriptions: |
| if isinstance(sub, int): |
| print('%04x' % sub, end=' ') |
| |
| if isinstance(sub, list): |
| label = uuid.UUID(bytes=b''.join(sub)) |
| print(label, end=' ') |
| print() |
| |
| class OnOffServer(Model): |
| def __init__(self, model_id): |
| Model.__init__(self, model_id) |
| self.cmd_ops = { 0x8201, # get |
| 0x8202, # set |
| 0x8203 } # set unacknowledged |
| |
| print("OnOff Server ", end="") |
| self.state = 0 |
| print('State ', end='') |
| self.timer = PubTimer() |
| |
| def process_message(self, source, key, data): |
| datalen = len(data) |
| print('OnOff Server process message len ', datalen) |
| |
| if datalen!=2 and datalen!=3: |
| return |
| |
| if datalen==2: |
| op_tuple=struct.unpack('<H',bytes(data)) |
| opcode = op_tuple[0] |
| if opcode != 0x8201: |
| print(hex(opcode)) |
| return |
| print('Get state') |
| elif datalen==3: |
| opcode,self.state=struct.unpack('<HB',bytes(data)) |
| if opcode != 0x8202 and opcode != 0x8203: |
| print(hex(opcode)) |
| return |
| print('Set state: ', end='') |
| print(self.state) |
| |
| rsp_data = struct.pack('<HB', 0x8204, self.state) |
| send_response(self.path, source, key, rsp_data) |
| |
| def publish(self): |
| print('Publish') |
| data = struct.pack('B', self.state) |
| send_publication(self.path, self.model_id, data) |
| |
| def set_publication(self, period): |
| if period == 0: |
| self.pub_period = 0 |
| self.timer.cancel() |
| return |
| |
| # We do not handle ms in this example |
| if period < 1000: |
| return |
| |
| self.pub_period = period |
| self.timer.start(period/1000, self.publish) |
| |
| def main(): |
| |
| DBusGMainLoop(set_as_default=True) |
| |
| global bus |
| bus = dbus.SystemBus() |
| global mainloop |
| global app |
| global mesh_net |
| |
| mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME, |
| "/org/bluez/mesh"), |
| MESH_NETWORK_IFACE) |
| mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb) |
| |
| app = Application(bus) |
| prov_agent = agent.Agent(bus) |
| app.set_agent(prov_agent) |
| first_ele = Element(bus, 0x00) |
| first_ele.add_model(OnOffServer(0x1000)) |
| app.add_element(first_ele) |
| |
| mainloop = GObject.MainLoop() |
| |
| print('Join') |
| caps = ["out-numeric"] |
| oob = ["other"] |
| uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F") |
| print(uuid) |
| mesh_net.Join(app.get_path(), uuid, |
| reply_handler=join_cb, |
| error_handler=join_error_cb) |
| |
| mainloop.run() |
| |
| if __name__ == '__main__': |
| main() |