| #!/usr/bin/env python3 |
| # SPDX-License-Identifier: LGPL-2.1-or-later |
| |
| import dbus |
| import dbus.exceptions |
| import dbus.mainloop.glib |
| import dbus.service |
| |
| try: |
| from gi.repository import GObject |
| except ImportError: |
| import gobject as GObject |
| import sys |
| |
| mainloop = None |
| app = None |
| bus = None |
| |
| BLUEZ_SERVICE_NAME = 'org.bluez' |
| DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' |
| DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' |
| |
| BATTERY_PROVIDER_MANAGER_IFACE = 'org.bluez.BatteryProviderManager1' |
| BATTERY_PROVIDER_IFACE = 'org.bluez.BatteryProvider1' |
| BATTERY_PROVIDER_PATH = '/path/to/provider' |
| |
| BATTERY_PATH1 = '11_11_11_11_11_11' |
| BATTERY_PATH2 = '22_22_22_22_22_22' |
| BATTERY_PATH3 = '33_33_33_33_33_33' |
| |
| class InvalidArgsException(dbus.exceptions.DBusException): |
| _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' |
| |
| |
| class Application(dbus.service.Object): |
| def __init__(self, bus): |
| self.path = BATTERY_PROVIDER_PATH |
| self.services = [] |
| self.batteries = [] |
| dbus.service.Object.__init__(self, bus, self.path) |
| |
| def get_path(self): |
| return dbus.ObjectPath(self.path) |
| |
| def add_battery(self, battery): |
| self.batteries.append(battery) |
| self.InterfacesAdded(battery.get_path(), battery.get_properties()) |
| GObject.timeout_add(1000, drain_battery, battery) |
| |
| def remove_battery(self, battery): |
| self.batteries.remove(battery) |
| self.InterfacesRemoved(battery.get_path(), [BATTERY_PROVIDER_IFACE]) |
| |
| @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') |
| def GetManagedObjects(self): |
| response = {} |
| print('GetManagedObjects called') |
| |
| for battery in self.batteries: |
| response[battery.get_path()] = battery.get_properties() |
| |
| return response |
| |
| @dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}') |
| def InterfacesAdded(self, object_path, interfaces_and_properties): |
| return |
| |
| @dbus.service.signal(DBUS_OM_IFACE, signature='oas') |
| def InterfacesRemoved(self, object_path, interfaces): |
| return |
| |
| |
| class Battery(dbus.service.Object): |
| """ |
| org.bluez.BatteryProvider1 interface implementation |
| """ |
| def __init__(self, bus, dev, percentage, source = None): |
| self.path = BATTERY_PROVIDER_PATH + '/dev_' + dev |
| self.dev_path = '/org/bluez/hci0/dev_' + dev |
| self.bus = bus |
| self.percentage = percentage |
| self.source = source |
| dbus.service.Object.__init__(self, bus, self.path) |
| |
| def get_battery_properties(self): |
| properties = {} |
| if self.percentage != None: |
| properties['Percentage'] = dbus.Byte(self.percentage) |
| if self.source != None: |
| properties['Source'] = self.source |
| properties['Device'] = dbus.ObjectPath(self.dev_path) |
| return properties |
| |
| def get_properties(self): |
| return { BATTERY_PROVIDER_IFACE: self.get_battery_properties() } |
| |
| def get_path(self): |
| return dbus.ObjectPath(self.path) |
| |
| def set_percentage(self, percentage): |
| if percentage < 0 or percentage > 100: |
| print('percentage not valid') |
| return |
| |
| self.percentage = percentage |
| print('battery %s percentage %d' % (self.path, self.percentage)) |
| self.PropertiesChanged( |
| BATTERY_PROVIDER_IFACE, self.get_battery_properties()) |
| |
| @dbus.service.method(DBUS_PROP_IFACE, |
| in_signature='s', |
| out_signature='a{sv}') |
| def GetAll(self, interface): |
| if interface != BATTERY_PROVIDER_IFACE: |
| raise InvalidArgsException() |
| |
| return self.get_properties()[BATTERY_PROVIDER_IFACE] |
| |
| @dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}') |
| def PropertiesChanged(self, interface, properties): |
| return |
| |
| |
| def add_late_battery(): |
| app.add_battery(Battery(bus, BATTERY_PATH3, 70, 'Protocol 2')) |
| |
| |
| def drain_battery(battery): |
| new_percentage = 100 |
| if battery.percentage != None: |
| new_percentage = battery.percentage - 5 |
| if new_percentage < 0: |
| new_percentage = 0 |
| |
| battery.set_percentage(new_percentage) |
| |
| if new_percentage <= 0: |
| return False |
| |
| return True |
| |
| def register_provider_cb(): |
| print('Battery Provider registered') |
| |
| # Battery added early right after RegisterBatteryProvider succeeds |
| app.add_battery(Battery(bus, BATTERY_PATH2, None)) |
| # Battery added later |
| GObject.timeout_add(5000, add_late_battery) |
| |
| |
| def register_provider_error_cb(error): |
| print('Failed to register Battery Provider: ' + str(error)) |
| mainloop.quit() |
| |
| |
| def find_manager(bus): |
| remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), |
| DBUS_OM_IFACE) |
| objects = remote_om.GetManagedObjects() |
| |
| for o, props in objects.items(): |
| if BATTERY_PROVIDER_MANAGER_IFACE in props.keys(): |
| return o |
| |
| return None |
| |
| |
| def unregister_provider_cb(): |
| print('Battery Provider unregistered') |
| |
| |
| def unregister_provider_error_cb(error): |
| print('Failed to unregister Battery Provider: ' + str(error)) |
| |
| |
| def unregister_battery_provider(battery_provider_manager): |
| battery_provider_manager.UnregisterBatteryProvider(BATTERY_PROVIDER_PATH, |
| reply_handler=unregister_provider_cb, |
| error_handler=unregister_provider_error_cb) |
| |
| |
| def remove_battery(app, battery): |
| app.remove_battery(battery) |
| |
| |
| """ |
| Simulates an application registering to BlueZ as a Battery Provider providing |
| fake batteries drained periodically. |
| """ |
| def main(): |
| global mainloop, bus, app |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SystemBus() |
| |
| manager_path = find_manager(bus) |
| if not manager_path: |
| print('BatteryProviderManager1 interface not found') |
| return |
| |
| print('BatteryProviderManager1 path = ', manager_path) |
| |
| battery_provider_manager = dbus.Interface( |
| bus.get_object(BLUEZ_SERVICE_NAME, manager_path), |
| BATTERY_PROVIDER_MANAGER_IFACE) |
| |
| app = Application(bus) |
| |
| # Battery pre-added before RegisterBatteryProvider |
| battery1 = Battery(bus, BATTERY_PATH1, 87, 'Protocol 1') |
| app.add_battery(battery1) |
| |
| mainloop = GObject.MainLoop() |
| |
| print('Registering Battery Provider...') |
| |
| battery_provider_manager.RegisterBatteryProvider(BATTERY_PROVIDER_PATH, |
| reply_handler=register_provider_cb, |
| error_handler=register_provider_error_cb) |
| |
| # Unregister the Battery Provider after an arbitrary amount of time |
| GObject.timeout_add( |
| 12000, unregister_battery_provider, battery_provider_manager) |
| # Simulate battery removal by a provider |
| GObject.timeout_add(8000, remove_battery, app, battery1) |
| |
| mainloop.run() |
| |
| |
| if __name__ == '__main__': |
| main() |