| #!/usr/bin/python |
| |
| import gobject |
| |
| import dbus |
| import dbus.mainloop.glib |
| |
| _dbus2py = { |
| dbus.String : unicode, |
| dbus.UInt32 : int, |
| dbus.Int32 : int, |
| dbus.Int16 : int, |
| dbus.UInt16 : int, |
| dbus.UInt64 : int, |
| dbus.Int64 : int, |
| dbus.Byte : int, |
| dbus.Boolean : bool, |
| dbus.ByteArray : str, |
| dbus.ObjectPath : str |
| } |
| |
| def dbus2py(d): |
| t = type(d) |
| if t in _dbus2py: |
| return _dbus2py[t](d) |
| if t is dbus.Dictionary: |
| return dict([(dbus2py(k), dbus2py(v)) for k, v in d.items()]) |
| if t is dbus.Array and d.signature == "y": |
| return "".join([chr(b) for b in d]) |
| if t is dbus.Array or t is list: |
| return [dbus2py(v) for v in d] |
| if t is dbus.Struct or t is tuple: |
| return tuple([dbus2py(v) for v in d]) |
| return d |
| |
| def pretty(d): |
| d = dbus2py(d) |
| t = type(d) |
| |
| if t in (dict, tuple, list) and len(d) > 0: |
| if t is dict: |
| d = ", ".join(["%s = %s" % (k, pretty(v)) |
| for k, v in d.items()]) |
| return "{ %s }" % d |
| |
| d = " ".join([pretty(e) for e in d]) |
| |
| if t is tuple: |
| return "( %s )" % d |
| |
| return str(d) |
| |
| def property_changed(name, value, path, interface): |
| iface = interface[interface.rfind(".") + 1:] |
| print "{%s} [%s] %s = %s" % (iface, path, name, pretty(value)) |
| |
| def added(name, value, member, path, interface): |
| iface = interface[interface.rfind(".") + 1:] |
| print "{%s} [%s] %s %s" % (iface, member, name, pretty(value)) |
| |
| def removed(name, member, path, interface): |
| iface = interface[interface.rfind(".") + 1:] |
| print "{%s} [%s] %s" % (iface, name, member) |
| |
| def event(member, path, interface): |
| iface = interface[interface.rfind(".") + 1:] |
| print "{%s} [%s] %s" % (iface, path, member) |
| |
| def message(msg, args, member, path, interface): |
| iface = interface[interface.rfind(".") + 1:] |
| print "{%s} [%s] %s %s (%s)" % (iface, path, member, |
| str(msg), pretty(args)) |
| |
| def ussd(msg, member, path, interface): |
| iface = interface[interface.rfind(".") + 1:] |
| print "{%s} [%s] %s %s" % (iface, path, member, str(msg)) |
| |
| def value(value, member, path, interface): |
| iface = interface[interface.rfind(".") + 1:] |
| print "{%s} [%s] %s %s" % (iface, path, member, str(value)) |
| |
| if __name__ == '__main__': |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| |
| bus = dbus.SystemBus() |
| |
| bus.add_signal_receiver(property_changed, |
| bus_name="org.ofono", |
| signal_name = "PropertyChanged", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| for member in ["IncomingBarringInEffect", |
| "OutgoingBarringInEffect", |
| "NearMaximumWarning"]: |
| bus.add_signal_receiver(event, |
| bus_name="org.ofono", |
| signal_name = member, |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| bus.add_signal_receiver(added, |
| bus_name="org.ofono", |
| signal_name = "ModemAdded", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| bus.add_signal_receiver(removed, |
| bus_name="org.ofono", |
| signal_name = "ModemRemoved", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| bus.add_signal_receiver(added, |
| bus_name="org.ofono", |
| signal_name = "ContextAdded", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| bus.add_signal_receiver(removed, |
| bus_name="org.ofono", |
| signal_name = "ContextRemoved", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| bus.add_signal_receiver(added, |
| bus_name="org.ofono", |
| signal_name = "CallAdded", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| bus.add_signal_receiver(removed, |
| bus_name="org.ofono", |
| signal_name = "CallRemoved", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| bus.add_signal_receiver(added, |
| bus_name="org.ofono", |
| signal_name = "MessageAdded", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| bus.add_signal_receiver(removed, |
| bus_name="org.ofono", |
| signal_name = "MessageRemoved", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| bus.add_signal_receiver(value, |
| bus_name="org.ofono", |
| signal_name = "DisconnectReason", |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| for member in ["IncomingBroadcast", "EmergencyBroadcast", |
| "IncomingMessage", "ImmediateMessage"]: |
| bus.add_signal_receiver(message, |
| bus_name="org.ofono", |
| signal_name = member, |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| for member in ["NotificationReceived", "RequestReceived"]: |
| bus.add_signal_receiver(ussd, |
| bus_name="org.ofono", |
| signal_name = member, |
| member_keyword="member", |
| path_keyword="path", |
| interface_keyword="interface") |
| |
| mainloop = gobject.MainLoop() |
| mainloop.run() |