| #!/usr/bin/python |
| # SPDX-License-Identifier: LGPL-2.1-or-later |
| |
| import argparse |
| import dbus |
| import dbus.mainloop.glib |
| import dbus.service |
| import json |
| import time |
| |
| from threading import Thread |
| |
| try: |
| from gi.repository import GObject # python3 |
| except ImportError: |
| import gobject as GObject # python2 |
| |
| DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' |
| DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' |
| |
| BLUEZ_SERVICE_NAME = 'org.bluez' |
| |
| ADV_MONITOR_MANAGER_IFACE = 'org.bluez.AdvertisementMonitorManager1' |
| ADV_MONITOR_IFACE = 'org.bluez.AdvertisementMonitor1' |
| ADV_MONITOR_APP_BASE_PATH = '/org/bluez/example/adv_monitor_app' |
| |
| |
| class AdvMonitor(dbus.service.Object): |
| |
| # Indexes of the Monitor object parameters in a monitor data list. |
| MONITOR_TYPE = 0 |
| RSSI_FILTER = 1 |
| PATTERNS = 2 |
| |
| # Indexes of the RSSI filter parameters in a monitor data list. |
| RSSI_H_THRESH = 0 |
| RSSI_H_TIMEOUT = 1 |
| RSSI_L_THRESH = 2 |
| RSSI_L_TIMEOUT = 3 |
| |
| # Indexes of the Patterns filter parameters in a monitor data list. |
| PATTERN_START_POS = 0 |
| PATTERN_AD_TYPE = 1 |
| PATTERN_DATA = 2 |
| |
| def __init__(self, bus, app_path, monitor_id, monitor_data): |
| self.path = app_path + '/monitor' + str(monitor_id) |
| self.bus = bus |
| |
| self._set_type(monitor_data[self.MONITOR_TYPE]) |
| self._set_rssi(monitor_data[self.RSSI_FILTER]) |
| self._set_patterns(monitor_data[self.PATTERNS]) |
| |
| super(AdvMonitor, self).__init__(self.bus, self.path) |
| |
| |
| def get_path(self): |
| return dbus.ObjectPath(self.path) |
| |
| |
| def get_properties(self): |
| properties = dict() |
| properties['Type'] = dbus.String(self.monitor_type) |
| properties['RSSIHighThreshold'] = dbus.Int16(self.rssi_h_thresh) |
| properties['RSSIHighTimeout'] = dbus.UInt16(self.rssi_h_timeout) |
| properties['RSSILowThreshold'] = dbus.Int16(self.rssi_l_thresh) |
| properties['RSSILowTimeout'] = dbus.UInt16(self.rssi_l_timeout) |
| properties['Patterns'] = dbus.Array(self.patterns, signature='(yyay)') |
| return {ADV_MONITOR_IFACE: properties} |
| |
| |
| def _set_type(self, monitor_type): |
| self.monitor_type = monitor_type |
| |
| |
| def _set_rssi(self, rssi): |
| self.rssi_h_thresh = rssi[self.RSSI_H_THRESH] |
| self.rssi_h_timeout = rssi[self.RSSI_H_TIMEOUT] |
| self.rssi_l_thresh = rssi[self.RSSI_L_THRESH] |
| self.rssi_l_timeout = rssi[self.RSSI_L_TIMEOUT] |
| |
| |
| def _set_patterns(self, patterns): |
| self.patterns = [] |
| for pattern in patterns: |
| start_pos = dbus.Byte(pattern[self.PATTERN_START_POS]) |
| ad_type = dbus.Byte(pattern[self.PATTERN_AD_TYPE]) |
| ad_data = [] |
| for byte in pattern[self.PATTERN_DATA]: |
| ad_data.append(dbus.Byte(byte)) |
| adv_pattern = dbus.Struct((start_pos, ad_type, ad_data), |
| signature='yyay') |
| self.patterns.append(adv_pattern) |
| |
| |
| def remove_monitor(self): |
| self.remove_from_connection() |
| |
| |
| @dbus.service.method(DBUS_PROP_IFACE, |
| in_signature='s', |
| out_signature='a{sv}') |
| def GetAll(self, interface): |
| print('{}: {} GetAll'.format(self.path, interface)) |
| if interface != ADV_MONITOR_IFACE: |
| print('{}: GetAll: Invalid arg {}'.format(self.path, interface)) |
| return {} |
| |
| return self.get_properties()[ADV_MONITOR_IFACE] |
| |
| |
| @dbus.service.method(ADV_MONITOR_IFACE, |
| in_signature='', |
| out_signature='') |
| def Activate(self): |
| print('{}: Monitor Activated'.format(self.path)) |
| |
| |
| @dbus.service.method(ADV_MONITOR_IFACE, |
| in_signature='', |
| out_signature='') |
| def Release(self): |
| print('{}: Monitor Released'.format(self.path)) |
| |
| |
| @dbus.service.method(ADV_MONITOR_IFACE, |
| in_signature='o', |
| out_signature='') |
| def DeviceFound(self, device): |
| print('{}: {} Device Found'.format(self.path, device)) |
| |
| |
| @dbus.service.method(ADV_MONITOR_IFACE, |
| in_signature='o', |
| out_signature='') |
| def DeviceLost(self, device): |
| print('{}: {} Device Lost'.format(self.path, device)) |
| |
| |
| class AdvMonitorApp(dbus.service.Object): |
| |
| def __init__(self, bus, advmon_manager, app_id): |
| self.bus = bus |
| self.advmon_mgr = advmon_manager |
| self.app_path = ADV_MONITOR_APP_BASE_PATH + str(app_id) |
| |
| self.monitors = dict() |
| |
| super(AdvMonitorApp, self).__init__(self.bus, self.app_path) |
| |
| |
| def get_app_path(self): |
| return dbus.ObjectPath(self.app_path) |
| |
| |
| def add_monitor(self, monitor_data): |
| monitor_id = 0 |
| while monitor_id in self.monitors: |
| monitor_id += 1 |
| |
| monitor = AdvMonitor(self.bus, self.app_path, monitor_id, monitor_data) |
| |
| # Emit the InterfacesAdded signal once the Monitor object is created. |
| self.InterfacesAdded(monitor.get_path(), monitor.get_properties()) |
| |
| self.monitors[monitor_id] = monitor |
| |
| return monitor_id |
| |
| |
| def remove_monitor(self, monitor_id): |
| monitor = self.monitors.pop(monitor_id, None) |
| if not monitor: |
| return False |
| |
| # Emit the InterfacesRemoved signal before removing the Monitor object. |
| self.InterfacesRemoved(monitor.get_path(), |
| monitor.get_properties().keys()) |
| |
| monitor.remove_monitor() |
| |
| return True |
| |
| |
| def register_app(self): |
| self.register_successful = None |
| |
| def register_cb(): |
| print('{}: RegisterMonitor successful'.format(self.app_path)) |
| self.register_successful = True |
| |
| def register_error_cb(error): |
| print('{}: RegisterMonitor failed: {}'.format(self.app_path, |
| str(error))) |
| self.register_successful = False |
| |
| self.advmon_mgr.RegisterMonitor(self.get_app_path(), |
| reply_handler=register_cb, |
| error_handler=register_error_cb) |
| |
| # Wait for the reply. |
| while self.register_successful is None: |
| pass |
| |
| return self.register_successful |
| |
| |
| def unregister_app(self): |
| self.unregister_successful = None |
| |
| def unregister_cb(): |
| print('{}: UnregisterMonitor successful'.format(self.app_path)) |
| self.unregister_successful = True |
| |
| def unregister_error_cb(error): |
| print('{}: UnregisterMonitor failed: {}'.format(self.app_path, |
| str(error))) |
| self.unregister_successful = False |
| |
| self.advmon_mgr.UnregisterMonitor(self.get_app_path(), |
| reply_handler=unregister_cb, |
| error_handler=unregister_error_cb) |
| |
| # Wait for the reply. |
| while self.unregister_successful is None: |
| pass |
| |
| return self.unregister_successful |
| |
| |
| @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') |
| def GetManagedObjects(self): |
| print('{}: GetManagedObjects'.format(self.app_path)) |
| objects = dict() |
| for monitor_id in self.monitors: |
| monitor = self.monitors[monitor_id] |
| objects[monitor.get_path()] = monitor.get_properties() |
| |
| return objects |
| |
| |
| @dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}') |
| def InterfacesAdded(self, object_path, interfaces_and_properties): |
| # Invoking this method emits the InterfacesAdded signal, |
| # nothing needs to be done here. |
| return |
| |
| |
| @dbus.service.signal(DBUS_OM_IFACE, signature='oas') |
| def InterfacesRemoved(self, object_path, interfaces): |
| # Invoking this method emits the InterfacesRemoved signal, |
| # nothing needs to be done here. |
| return |
| |
| |
| def read_adapter_supported_monitor_types(adapter_props): |
| types = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE, |
| 'SupportedMonitorTypes', |
| dbus_interface=DBUS_PROP_IFACE)) |
| return json.loads(types) |
| |
| |
| def read_adapter_supported_monitor_features(adapter_props): |
| features = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE, |
| 'SupportedFeatures', |
| dbus_interface=DBUS_PROP_IFACE)) |
| return json.loads(features) |
| |
| |
| def print_supported_types_and_features(adapter_props): |
| supported_types = read_adapter_supported_monitor_types(adapter_props) |
| for supported_type in supported_types: |
| print(supported_type) |
| |
| supported_features = read_adapter_supported_monitor_features(adapter_props) |
| for supported_feature in supported_features: |
| print(supported_feature) |
| |
| |
| def find_advmon_mgr(bus, adapter): |
| return dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), |
| ADV_MONITOR_MANAGER_IFACE) |
| |
| |
| def find_adapter(bus): |
| remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), |
| DBUS_OM_IFACE) |
| objects = remote_om.GetManagedObjects() |
| |
| adapter = None |
| adapter_props = None |
| |
| for o, props in objects.items(): |
| if ADV_MONITOR_MANAGER_IFACE in props: |
| adapter = o |
| break |
| |
| if adapter: |
| # Turn on the bluetooth adapter. |
| adapter_props = dbus.Interface( |
| bus.get_object(BLUEZ_SERVICE_NAME, adapter), |
| DBUS_PROP_IFACE) |
| adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1)) |
| |
| return adapter, adapter_props |
| |
| |
| def test(bus, mainloop, advmon_mgr, app_id): |
| # Create an App instance. |
| app = AdvMonitorApp(bus, advmon_mgr, app_id) |
| |
| # Create two monitor objects before registering the app. No Activate() or |
| # Release() should get called yet as the app is not registered. |
| data0 = [ |
| 'invalid_patterns', |
| [-50, 1, -70, 1], |
| [[0, 0x03, [0x12, 0x18]]] # Service Class UUID is 0x1812 (HOG) |
| ] |
| data1 = [ |
| 'or_patterns', |
| [127, 0, 127, 0], |
| [[5, 0x09, [ord('_')]]] # 5th character of the Local Name is '_' |
| ] |
| monitor0 = app.add_monitor(data0) |
| monitor1 = app.add_monitor(data1) |
| |
| # Register the app root path to expose advertisement monitors. |
| # Release() should get called on monitor0 - incorrect monitor type. |
| # Activate() should get called on monitor1. |
| ret = app.register_app() |
| if not ret: |
| print('RegisterMonitor failed.') |
| mainloop.quit() |
| exit(-1) |
| |
| # Create two more monitor objects. |
| # Release() should get called on monitor2 - incorrect RSSI Filter values. |
| # Activate() should get called on monitor3. |
| data2 = [ |
| 'or_patterns', |
| [-50, 1, -30, 1], |
| [[0, 0x19, [0xC2, 0x03]]] # Appearance is 0xC203 (Mouse) |
| ] |
| data3 = [ |
| 'or_patterns', |
| [-50, 1, -70, 1], |
| [[0, 0x03, [0x12, 0x18]], [0, 0x19, [0xC2, 0x03]]] |
| ] |
| monitor2 = app.add_monitor(data2) |
| monitor3 = app.add_monitor(data3) |
| |
| # Run until user hits the 'Enter' key. If any peer device is advertising |
| # during this time, DeviceFound() should get triggered for monitors |
| # matching the advertisements. |
| raw_input('Press "Enter" key to quit...\n') |
| |
| # Remove a monitor. DeviceFound() for this monitor should not get |
| # triggered any more. |
| app.remove_monitor(monitor1) |
| |
| # Unregister the app. Release() should get invoked on active monitors, |
| # monitor3 in this case. |
| app.unregister_app() |
| |
| mainloop.quit() |
| |
| |
| def main(app_id): |
| # Initialize threads in gobject/dbus-glib before creating local threads. |
| GObject.threads_init() |
| dbus.mainloop.glib.threads_init() |
| |
| # Arrange for the GLib main loop to be the default. |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SystemBus() |
| mainloop = GObject.MainLoop() |
| |
| # Find bluetooth adapter and power it on. |
| adapter, adapter_props = find_adapter(bus) |
| if not adapter or not adapter_props: |
| print('Bluetooth adapter not found.') |
| exit(-1) |
| |
| # Read supported types and find AdvertisementMonitorManager1 interface. |
| print_supported_types_and_features(adapter_props) |
| advmon_mgr = find_advmon_mgr(bus, adapter) |
| if not advmon_mgr : |
| print('AdvertisementMonitorManager1 interface not found.') |
| exit(-1) |
| |
| Thread(target=test, args=(bus, mainloop, advmon_mgr, app_id)).start() |
| |
| mainloop.run() # blocks until mainloop.quit() is called |
| |
| |
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--app_id', default=0, type=int, help='use this App-ID ' |
| 'for creating dbus objects (default: 0)') |
| args = parser.parse_args() |
| |
| main(args.app_id) |