| #!/usr/bin/python3 |
| |
| import dbus |
| import dbus.mainloop.glib |
| import sys |
| from gi.repository import GLib |
| import os |
| |
| def print_menu(): |
| print("Select test case") |
| print("----------------------------------------------------------------") |
| print("[0] Activate cbs") |
| print("[1] Deactivate cbs") |
| print("[2] Get cbs properties") |
| print("[3] Set/Register topics") |
| print(" If several - give topics separated with comma. \ |
| \n E.g. 20,50-51,60") |
| print("[4] Clear/Unregister topics") |
| print("[5] NetReg Base Station - Get current serving cell") |
| print("[x] Exit") |
| print("----------------------------------------------------------------") |
| |
| def property_changed(property, value): |
| if value == "" and property == "Topics": |
| print("User selected Topics have been cleared. \ |
| \nRegistered for emergency topics only.") |
| else: |
| print("Cell Broadcast property %s is changed to %s" % (property, value)) |
| print("\nPress ENTER to continue") |
| |
| def incoming_broadcast(text, topic): |
| print("Broadcast msg: %s \n Topic channel: %s" % (text, topic)) |
| print("\nPress ENTER to continue") |
| |
| def emergency_broadcast(text, properties): |
| emergType = properties["EmergencyType"] |
| emergAlert = properties["EmergencyAlert"] |
| |
| print("Broadcast msg: %s \n\t Type: %s \n\t Alert: %s \n\t Popup: %s" \ |
| % (text, emergType, emergAlert, popup)) |
| |
| if properties["Popup"] == True: |
| print("Popup required.") |
| |
| print("\nPress ENTER to continue") |
| |
| def set_cbs_state(cbs, state): |
| if state == True: |
| print("Activating cell broadcast...") |
| cbs.SetProperty("Powered", dbus.Boolean(1)) |
| else: |
| print("Deactivating cell broadcast...") |
| cbs.SetProperty("Powered", dbus.Boolean(0)) |
| print("-----------------------------------------------------------") |
| |
| def print_cbs_properties(cbs): |
| properties = cbs.GetProperties() |
| print("---------------------PROPERTIES----------------------------") |
| for p in properties: |
| if len(properties[p].__str__()) > 0: |
| if p == "Powered": |
| if properties[p] == True: |
| print("Cell Broadcast is Activated.") |
| else: |
| print("Cell Broadcast is Deactivated.") |
| elif p == "Topics": |
| print("Currently set CBS %s are: %s" \ |
| % (p, properties[p])) |
| topics_available = True |
| else: |
| print("Cell Broadcast %s value empty" % (p)) |
| print("-----------------------------------------------------------") |
| |
| def set_topics(cbs): |
| print_cbs_properties(cbs) |
| |
| topicTemp = "" |
| invalidData = False; |
| index = 0 |
| |
| topics = input('Enter the topic ID(s) you want to register to: ') |
| |
| while index < len(topics): |
| if topics[index] == ',' or topics[index] == '-': |
| topicTemp = "" |
| elif topics[index] >= '0' and topics[index] <= '9': |
| topicTemp = topicTemp + topics[index] |
| else: |
| print("Invalid char. \"%s\" entered. Topic not set." \ |
| % (topics[index])) |
| invalidData = True |
| break |
| |
| if topicTemp: |
| if int(topicTemp) > 999: |
| invalidData = True |
| print("Invalid Topic ID %s (range 0-999). \ |
| \nCould not register." % topicTemp) |
| |
| index = index + 1 |
| |
| if invalidData == False: |
| try: |
| print("Setting Cell Broadcast topics...") |
| cbs.SetProperty("Topics", topics); |
| except dbus.DBusException as e: |
| print("Unable to set topic: %s" % e) |
| |
| print("-----------------------------------------------------------") |
| |
| def get_serving_cell_name(netReg): |
| wasFound = False; |
| properties = netReg.GetProperties() |
| |
| for p in properties: |
| if p == "BaseStation": |
| if len(properties[p].__str__()) > 0: |
| print("Current serving cell name: %s" \ |
| % (properties["BaseStation"])) |
| wasFound = True; |
| else: |
| print("Current Serving cell name empty. \ |
| Base Station CBS not available.") |
| |
| if wasFound == False: |
| print("Base Station parameter not found. \ |
| \nBase Station CBS not available.") |
| print("-----------------------------------------------------------") |
| |
| def stdin_handler(channel, condition, cbs, netReg): |
| in_key = os.read(channel.unix_get_fd(), 160).rstrip().decode('UTF-8') |
| |
| if in_key == '0': |
| set_cbs_state(cbs, True) |
| |
| elif in_key == '1': |
| set_cbs_state(cbs, False) |
| |
| elif in_key == '2': |
| print_cbs_properties(cbs) |
| |
| elif in_key == '3': |
| set_topics(cbs) |
| |
| elif in_key == '4': |
| cbs.SetProperty("Topics", "") |
| |
| elif in_key == '5': |
| get_serving_cell_name(netReg) |
| |
| elif in_key == 'x': |
| sys.exit(1) |
| |
| print('\n' * 2) |
| print_menu() |
| |
| return True |
| |
| if __name__ == "__main__": |
| |
| dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) |
| bus = dbus.SystemBus() |
| |
| manager = dbus.Interface(bus.get_object('org.ofono', '/'), |
| 'org.ofono.Manager') |
| |
| modems = manager.GetModems() |
| path = modems[0][0] |
| |
| cbs = dbus.Interface(bus.get_object('org.ofono', path), |
| 'org.ofono.CellBroadcast') |
| |
| netReg = dbus.Interface(bus.get_object('org.ofono', path), |
| 'org.ofono.NetworkRegistration') |
| |
| cbs.connect_to_signal("PropertyChanged", property_changed) |
| cbs.connect_to_signal("IncomingBroadcast", incoming_broadcast) |
| cbs.connect_to_signal("EmergencyBroadcast", emergency_broadcast) |
| |
| print('\n' * 2) |
| |
| print_menu() |
| |
| GLib.io_add_watch(GLib.IOChannel(filedes=sys.stdin.fileno()), |
| GLib.PRIORITY_DEFAULT, GLib.IO_IN, stdin_handler, cbs, \ |
| netReg) |
| |
| mainloop = GLib.MainLoop() |
| mainloop.run() |